Skip to content

Commit f6dc7bc

Browse files
committed
Release 0.5.2
1 parent 5aee487 commit f6dc7bc

File tree

8 files changed

+89
-93
lines changed

8 files changed

+89
-93
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/target
1+
target/
22
**/*.rs.bk
33
Cargo.lock
44
.cargo

CHANGELOG.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11

22
# CHANGELOG
33

4-
## v0.5.2 2020-01-15
4+
## v0.5.2 2020-01-17
55
* Do health check for the connection before return it.
66
* Add configure item `health_check`.
7-
* Impl `Debug` for State.
7+
* Impl Debug for State.
8+
* Add method is_brand_new to Connection, which returns true if the connection is newly established.
9+
* Skip health check of those new connections.
810

911
## v0.5.1 2020-01-07
1012
* Switch to `async-trait`

mobc-redis/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ edition = "2018"
88

99
[dependencies]
1010
mobc = { version = "0.5", path = ".." }
11-
redis = { version = "0.13.1-alpha.0", git = "https://github.com/mitsuhiko/redis-rs" }
11+
redis = "0.14"
1212

1313
[dev-dependencies]
1414
actix-web = "2.0.0"
1515
actix-rt = "1.0"
16-
actix-http = "1.0"
16+
actix-http = "1.0"
17+
tokio = { version = "0.2", features = ["full"]}

mobc-redis/examples/redis.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ use std::time::Instant;
77
async fn main() {
88
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
99
let manager = RedisConnectionManager::new(client);
10-
let pool = Pool::builder().max_open(100).build(manager);
10+
let pool = Pool::builder().max_open(20).build(manager);
1111

1212
const MAX: usize = 5000;
1313

1414
let now = Instant::now();
15-
let (tx, mut rx) = tokio::sync::mpsc::channel::<usize>(16);
15+
let (tx, mut rx) = tokio::sync::mpsc::channel::<usize>(5000);
1616
for i in 0..MAX {
1717
let pool = pool.clone();
1818
let mut tx_c = tx.clone();

src/lib.rs

Lines changed: 55 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ struct Conn<C, E> {
193193
#[allow(dead_code)]
194194
last_err: Mutex<Option<E>>,
195195
created_at: Instant,
196+
brand_new: bool,
196197
}
197198

198199
impl<C, E> Conn<C, E> {
@@ -436,35 +437,14 @@ impl<M: Manager> Pool<M> {
436437
loop {
437438
let timeout = delay_until(deadline);
438439
try_times += 1;
439-
match self
440-
.inner_get_ctx(GetStrategy::CachedOrNewConn, timeout)
441-
.await
442-
{
443-
Ok(mut conn) => {
444-
if !self.0.config.health_check {
445-
return Ok(conn);
446-
}
447-
return {
448-
let raw_conn = conn.conn.as_mut().unwrap().raw.take().unwrap();
449-
match self.0.manager.check(raw_conn).await {
450-
Ok(raw) => conn.conn.as_mut().unwrap().raw = Some(raw),
451-
Err(e) => {
452-
if try_times == config.max_bad_conn_retries {
453-
return Err(Error::Inner(e));
454-
}
455-
continue;
456-
}
457-
}
458-
Ok(conn)
459-
};
460-
}
440+
match self.get_ctx(GetStrategy::CachedOrNewConn, timeout).await {
441+
Ok(conn) => return Ok(conn),
461442
Err(Error::BadConn) => {
462443
if try_times == config.max_bad_conn_retries {
463444
let timeout = delay_until(deadline);
464-
return self
465-
.inner_get_ctx(GetStrategy::AlwaysNewConn, timeout)
466-
.await;
445+
return self.get_ctx(GetStrategy::AlwaysNewConn, timeout).await;
467446
}
447+
continue;
468448
}
469449
Err(err) => return Err(err),
470450
}
@@ -477,46 +457,52 @@ impl<M: Manager> Pool<M> {
477457
loop {
478458
let never_ot = futures::future::pending();
479459
try_times += 1;
480-
match self
481-
.inner_get_ctx(GetStrategy::CachedOrNewConn, never_ot)
482-
.await
483-
{
484-
Ok(mut conn) => {
485-
if !self.0.config.health_check {
486-
return Ok(conn);
487-
}
488-
return {
489-
let raw_conn = conn.conn.as_mut().unwrap().raw.take().unwrap();
490-
match self.0.manager.check(raw_conn).await {
491-
Ok(raw) => conn.conn.as_mut().unwrap().raw = Some(raw),
492-
Err(e) => {
493-
if try_times == config.max_bad_conn_retries {
494-
return Err(Error::Inner(e));
495-
}
496-
continue;
497-
}
498-
}
499-
Ok(conn)
500-
};
501-
}
460+
match self.get_ctx(GetStrategy::CachedOrNewConn, never_ot).await {
461+
Ok(conn) => return Ok(conn),
502462
Err(Error::BadConn) => {
503463
if try_times == config.max_bad_conn_retries {
504464
let never_ot = futures::future::pending();
505-
return self
506-
.inner_get_ctx(GetStrategy::AlwaysNewConn, never_ot)
507-
.await;
465+
return self.get_ctx(GetStrategy::AlwaysNewConn, never_ot).await;
508466
}
467+
continue;
509468
}
510469
Err(err) => return Err(err),
511470
}
512471
}
513472
}
514473

515-
async fn inner_get_ctx(
474+
async fn get_ctx(
516475
&self,
517476
strategy: GetStrategy,
518477
ctx: impl Future<Output = ()> + Unpin,
519478
) -> Result<Connection<M>, Error<M::Error>> {
479+
let mut c = self.inner_get_ctx(strategy, ctx).await?;
480+
let mut internals = self.0.internals.lock().await;
481+
if !c.brand_new && c.expired(internals.config.max_lifetime) {
482+
c.close(&mut internals);
483+
return Err(Error::BadConn);
484+
}
485+
486+
if !c.brand_new {
487+
let raw = c.raw.take().unwrap();
488+
match self.0.manager.check(raw).await {
489+
Ok(raw) => c.raw = Some(raw),
490+
Err(e) => return Err(Error::Inner(e)),
491+
}
492+
}
493+
494+
let conn = Connection {
495+
pool: Some(self.clone()),
496+
conn: Some(c),
497+
};
498+
Ok(conn)
499+
}
500+
501+
async fn inner_get_ctx(
502+
&self,
503+
strategy: GetStrategy,
504+
ctx: impl Future<Output = ()> + Unpin,
505+
) -> Result<Conn<M::Connection, M::Error>, Error<M::Error>> {
520506
let mut ctx = ctx.fuse();
521507

522508
let mut internals = self.0.internals.lock().await;
@@ -529,18 +515,7 @@ impl<M: Manager> Pool<M> {
529515
let num_free = internals.free_conns.len();
530516
if strategy == GetStrategy::CachedOrNewConn && num_free > 0 {
531517
let c = internals.free_conns.swap_remove(0);
532-
533-
if c.expired(internals.config.max_lifetime) {
534-
c.close(&mut internals);
535-
return Err(Error::BadConn);
536-
}
537-
538-
drop(internals);
539-
let pooled = Connection {
540-
pool: Some(self.clone()),
541-
conn: Some(c),
542-
};
543-
return Ok(pooled);
518+
return Ok(c);
544519
}
545520

546521
if internals.config.max_open > 0 {
@@ -564,19 +539,7 @@ impl<M: Manager> Pool<M> {
564539
}
565540
conn = req_recv.fuse() => {
566541
let c = conn.unwrap();
567-
568-
let mut internals = self.0.internals.lock().await;
569-
if c.expired(internals.config.max_lifetime) {
570-
c.close(&mut internals);
571-
return Err(Error::BadConn);
572-
}
573-
let pooled = Connection {
574-
pool: Some(self.clone()),
575-
conn: Some(c),
576-
};
577-
return Ok(pooled)
578-
579-
542+
return Ok(c)
580543
}
581544
}
582545
}
@@ -592,13 +555,10 @@ impl<M: Manager> Pool<M> {
592555
raw: Some(c),
593556
last_err: Mutex::new(None),
594557
created_at: Instant::now(),
595-
};
596-
let pooled = Connection {
597-
pool: Some(self.clone()),
598-
conn: Some(conn),
558+
brand_new: true,
599559
};
600560

601-
return Ok(pooled);
561+
return Ok(conn);
602562
}
603563
Err(e) => {
604564
let internals = self.0.internals.lock().await;
@@ -649,10 +609,11 @@ async fn open_new_connection<M: Manager>(shared: &Weak<SharedPool<M>>) {
649609
raw: Some(c),
650610
last_err: Mutex::new(None),
651611
created_at: Instant::now(),
612+
brand_new: true,
652613
};
653614
return put_conn(&shared, internals, conn).await;
654615
}
655-
Err(e) => {
616+
Err(_) => {
656617
internals.num_open -= 1;
657618
return maybe_open_new_connection(&shared, internals).await;
658619
}
@@ -662,16 +623,20 @@ async fn open_new_connection<M: Manager>(shared: &Weak<SharedPool<M>>) {
662623
async fn put_conn<M: Manager>(
663624
shared: &Arc<SharedPool<M>>,
664625
mut internals: MutexGuard<'_, PoolInternals<M::Connection, M::Error>>,
665-
conn: Conn<M::Connection, M::Error>,
626+
mut conn: Conn<M::Connection, M::Error>,
666627
) {
667628
if conn.raw.is_none() {
629+
conn.close(&mut internals);
668630
return maybe_open_new_connection(shared, internals).await;
669631
}
670632

671633
if internals.config.max_open > 0 && internals.num_open > internals.config.max_open {
634+
conn.close(&mut internals);
672635
return;
673636
}
674637

638+
conn.brand_new = false;
639+
675640
if internals.conn_requests.len() > 0 {
676641
let key = internals.conn_requests.keys().next().unwrap().clone();
677642
let req = internals.conn_requests.remove(&key).unwrap();
@@ -794,6 +759,13 @@ pub struct Connection<M: Manager> {
794759
conn: Option<Conn<M::Connection, M::Error>>,
795760
}
796761

762+
impl<M: Manager> Connection<M> {
763+
/// Returns true is the connection is newly established.
764+
pub fn is_brand_new(&self) -> bool {
765+
self.conn.as_ref().unwrap().brand_new
766+
}
767+
}
768+
797769
impl<M: Manager> Drop for Connection<M> {
798770
fn drop(&mut self) {
799771
let pool = self.pool.take().unwrap();

src/runtime.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ mod runtime {
7474
mod runtime {
7575
use super::*;
7676
use async_std::task;
77-
use async_std::task::JoinHandle;
7877
pub struct Runtime(TaskExecutor);
7978

8079
impl Runtime {

src/time.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,20 @@ mod time {
2020
}
2121
}
2222

23+
/// Creates new Interval that yields with interval of duration.
2324
pub fn interval(duration: Duration) -> Interval {
2425
Interval {
2526
timer: Some(Delay::new(Duration::from_secs(0))),
2627
interval: duration,
2728
}
2829
}
2930

31+
/// Wait until duration has elapsed.
3032
pub fn delay_for(duration: Duration) -> Delay {
3133
Delay::new(duration)
3234
}
3335

36+
/// Wait until deadline is reached.
3437
pub fn delay_until(deadline: Instant) -> Delay {
3538
let mut delay = Delay::new(Duration::from_secs(1));
3639
delay.reset(deadline);

tests/mobc.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,9 @@ fn test_drop_on_broken() {
187187
rt.block_on(async {
188188
let pool = Pool::new(handler);
189189

190-
assert!(pool.get().await.is_err());
190+
assert!(pool.get().await.is_ok());
191191
delay_for(Duration::from_secs(1)).await;
192+
assert!(pool.get().await.is_err());
192193
assert!(DROPPED.load(Ordering::SeqCst));
193194
Ok::<(), Error<TestError>>(())
194195
})
@@ -228,8 +229,9 @@ fn test_invalid_conn() {
228229
rt.block_on(async {
229230
let pool = Pool::builder().max_open(1).build(Handler);
230231

231-
assert!(pool.get().await.is_err());
232+
assert!(pool.get().await.is_ok());
232233
delay_for(Duration::from_secs(1)).await;
234+
assert!(pool.get().await.is_err());
233235
assert!(DROPPED.load(Ordering::SeqCst));
234236
assert_eq!(1_u64, pool.state().await.connections);
235237
Ok::<(), Error<TestError>>(())
@@ -700,3 +702,20 @@ fn test_conns_drop_on_pool_drop() {
700702
})
701703
.unwrap();
702704
}
705+
706+
#[test]
707+
fn test_is_brand_new() {
708+
let mut rt = Runtime::new().unwrap();
709+
let handler = OkManager;
710+
rt.block_on(async {
711+
let pool = Pool::builder().max_open(1).build(handler);
712+
713+
let conn = pool.get().await.ok().unwrap();
714+
assert!(conn.is_brand_new());
715+
drop(conn);
716+
let conn = pool.get().await.ok().unwrap();
717+
assert!(!conn.is_brand_new());
718+
Ok::<(), Error<TestError>>(())
719+
})
720+
.unwrap();
721+
}

0 commit comments

Comments
 (0)