Skip to content

Commit 04eb7b2

Browse files
SevInfaqrln
andauthored
fix: Ensure connection pool metrics stay consistent (#99)
* fix: Ensure connection pool metrics stay consistent This started as a bugfix to several prisma issues regarding incorrect metrics: prisma/prisma#25177 prisma/prisma#23525 And a couple of more discovered the testing, but not reported in the issues. There were several causes for this: 1. Following pattern appears quite a lot in mobc code: ```rust gauge!("something").increment(1.0); do_a_thing_that_could_fail()?; gauge!("something").decrement(1.0); ``` So, in case `do_a_thing_that_could_fail` actually fails, gauge will get incremented, but never will get decremented. 2. Couple of metrics were relying on `Conn::close` being manually called and that was not the case every once in a while. To prevent both of those problems, I rewrote the internals of library to rely on RAII rather than manual counters and resources management. `Conn` struct is now split into two: - `ActiveConn` - represents currently checked out connection that have been actively used by the client. Holds onto semaphore permit and can be converted into `IdleConn`. Doing so will free the permit. - `IdleConn` - represents idle connection, currently checked into the pool. Can be converted to `ActiveConn` by providing a valid permit. `ConnState` represents the shared state of the connection that is retained between different activity states. Both `IdleConn` and `ActiveConn` manage their corresponding gauges - increment them on creation and decrement them during drop. `ConnState` manages `CONNECTIONS_OPEN` gauge and `CONNECTIONS_TOTAL` and `CLOSED_TOTAL` counters in the same way. This system ensures that metrics stay consistent: since metrics are automatically incremented and decremented on state conversions, we can always be sure that: - Connection is always either idle or active, there is no in between state. - Idle connections and active connections gauges will always add up to the currently open connections gauge - Total connections open counter, minus total connections closed gauge will always be equal to number of currently open connections. Since resources are now managed by `Drop::drop` implementations, that removes the need for manual `close` method and simiplifies the code quite in a few places, also ensuring it is safer against future changes. * Update src/conn.rs Co-authored-by: Alexey Orlenko <[email protected]> --------- Co-authored-by: Alexey Orlenko <[email protected]>
1 parent a9555f3 commit 04eb7b2

File tree

5 files changed

+303
-173
lines changed

5 files changed

+303
-173
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ async-trait = "0.1"
2727
futures-timer = "3.0.2"
2828
log = "0.4"
2929
thiserror = "1.0"
30-
metrics = "0.22.1"
30+
metrics = "0.23.0"
3131
tracing = { version = "0.1", features = ["attributes"] }
3232
tracing-subscriber = "0.3.11"
3333

src/conn.rs

+183
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
use std::{
2+
marker::PhantomData,
3+
sync::{
4+
atomic::{AtomicU64, Ordering},
5+
Arc,
6+
},
7+
time::{Duration, Instant},
8+
};
9+
10+
use metrics::counter;
11+
use tokio::sync::OwnedSemaphorePermit;
12+
13+
use crate::metrics_utils::{
14+
GaugeGuard, ACTIVE_CONNECTIONS, CLOSED_TOTAL, IDLE_CONNECTIONS, OPENED_TOTAL, OPEN_CONNECTIONS,
15+
};
16+
17+
pub(crate) struct ActiveConn<C> {
18+
inner: C,
19+
state: ConnState,
20+
_permit: OwnedSemaphorePermit,
21+
_active_connections_gauge: GaugeGuard,
22+
}
23+
24+
impl<C> ActiveConn<C> {
25+
pub(crate) fn new(inner: C, permit: OwnedSemaphorePermit, state: ConnState) -> ActiveConn<C> {
26+
Self {
27+
inner,
28+
state,
29+
_permit: permit,
30+
_active_connections_gauge: GaugeGuard::increment(ACTIVE_CONNECTIONS),
31+
}
32+
}
33+
34+
pub(crate) fn into_idle(self) -> IdleConn<C> {
35+
IdleConn {
36+
inner: self.inner,
37+
state: self.state,
38+
_idle_connections_gauge: GaugeGuard::increment(IDLE_CONNECTIONS),
39+
}
40+
}
41+
42+
pub(crate) fn is_brand_new(&self) -> bool {
43+
self.state.brand_new
44+
}
45+
46+
pub(crate) fn set_brand_new(&mut self, brand_new: bool) {
47+
self.state.brand_new = brand_new;
48+
}
49+
50+
pub(crate) fn into_raw(self) -> C {
51+
self.inner
52+
}
53+
54+
pub(crate) fn as_raw_ref(&self) -> &C {
55+
&self.inner
56+
}
57+
58+
pub(crate) fn as_raw_mut(&mut self) -> &mut C {
59+
&mut self.inner
60+
}
61+
}
62+
63+
pub(crate) struct IdleConn<C> {
64+
inner: C,
65+
state: ConnState,
66+
_idle_connections_gauge: GaugeGuard,
67+
}
68+
69+
impl<C> IdleConn<C> {
70+
pub(crate) fn is_brand_new(&self) -> bool {
71+
self.state.brand_new
72+
}
73+
74+
pub(crate) fn into_active(self, permit: OwnedSemaphorePermit) -> ActiveConn<C> {
75+
ActiveConn::new(self.inner, permit, self.state)
76+
}
77+
78+
pub(crate) fn created_at(&self) -> Instant {
79+
self.state.created_at
80+
}
81+
82+
pub(crate) fn expired(&self, timeout: Option<Duration>) -> bool {
83+
timeout
84+
.and_then(|check_interval| {
85+
Instant::now()
86+
.checked_duration_since(self.state.created_at)
87+
.map(|dur_since| dur_since >= check_interval)
88+
})
89+
.unwrap_or(false)
90+
}
91+
92+
pub(crate) fn idle_expired(&self, timeout: Option<Duration>) -> bool {
93+
timeout
94+
.and_then(|check_interval| {
95+
Instant::now()
96+
.checked_duration_since(self.state.last_used_at)
97+
.map(|dur_since| dur_since >= check_interval)
98+
})
99+
.unwrap_or(false)
100+
}
101+
102+
pub(crate) fn needs_health_check(&self, timeout: Option<Duration>) -> bool {
103+
timeout
104+
.and_then(|check_interval| {
105+
Instant::now()
106+
.checked_duration_since(self.state.last_checked_at)
107+
.map(|dur_since| dur_since >= check_interval)
108+
})
109+
.unwrap_or(true)
110+
}
111+
112+
pub(crate) fn mark_checked(&mut self) {
113+
self.state.last_checked_at = Instant::now()
114+
}
115+
116+
pub(crate) fn split_raw(self) -> (C, ConnSplit<C>) {
117+
(
118+
self.inner,
119+
ConnSplit::new(self.state, self._idle_connections_gauge),
120+
)
121+
}
122+
}
123+
124+
pub(crate) struct ConnState {
125+
pub(crate) created_at: Instant,
126+
pub(crate) last_used_at: Instant,
127+
pub(crate) last_checked_at: Instant,
128+
pub(crate) brand_new: bool,
129+
total_connections_open: Arc<AtomicU64>,
130+
total_connections_closed: Arc<AtomicU64>,
131+
_open_connections_gauge: GaugeGuard,
132+
}
133+
134+
impl ConnState {
135+
pub(crate) fn new(
136+
total_connections_open: Arc<AtomicU64>,
137+
total_connections_closed: Arc<AtomicU64>,
138+
) -> Self {
139+
counter!(OPENED_TOTAL).increment(1);
140+
Self {
141+
created_at: Instant::now(),
142+
last_used_at: Instant::now(),
143+
last_checked_at: Instant::now(),
144+
brand_new: true,
145+
total_connections_open,
146+
total_connections_closed,
147+
_open_connections_gauge: GaugeGuard::increment(OPEN_CONNECTIONS),
148+
}
149+
}
150+
}
151+
152+
impl Drop for ConnState {
153+
fn drop(&mut self) {
154+
self.total_connections_open.fetch_sub(1, Ordering::Relaxed);
155+
self.total_connections_closed
156+
.fetch_add(1, Ordering::Relaxed);
157+
counter!(CLOSED_TOTAL).increment(1);
158+
}
159+
}
160+
161+
pub(crate) struct ConnSplit<C> {
162+
state: ConnState,
163+
gauge: GaugeGuard,
164+
_phantom: PhantomData<C>,
165+
}
166+
167+
impl<C> ConnSplit<C> {
168+
fn new(state: ConnState, gauge: GaugeGuard) -> Self {
169+
Self {
170+
state,
171+
gauge,
172+
_phantom: PhantomData,
173+
}
174+
}
175+
176+
pub(crate) fn restore(self, raw: C) -> IdleConn<C> {
177+
IdleConn {
178+
inner: raw,
179+
state: self.state,
180+
_idle_connections_gauge: self.gauge,
181+
}
182+
}
183+
}

0 commit comments

Comments
 (0)