Skip to content

Commit e243b62

Browse files
authored
Avoid deadlocks when flushing session activities (#4463)
2 parents 305914e + e2dde82 commit e243b62

File tree

5 files changed

+54
-26
lines changed

5 files changed

+54
-26
lines changed

crates/handlers/src/activity_tracker/mod.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ impl ActivityTracker {
185185
// This guard on the shutdown token is to ensure that if this task crashes for
186186
// any reason, the server will shut down
187187
let _guard = cancellation_token.clone().drop_guard();
188+
let mut interval = tokio::time::interval(interval);
189+
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
188190

189191
loop {
190192
tokio::select! {
@@ -202,7 +204,7 @@ impl ActivityTracker {
202204
}
203205

204206

205-
() = tokio::time::sleep(interval) => {
207+
_ = interval.tick() => {
206208
self.flush().await;
207209
}
208210
}

crates/handlers/src/activity_tracker/worker.rs

+24-7
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
use std::{collections::HashMap, net::IpAddr};
88

99
use chrono::{DateTime, Utc};
10-
use mas_storage::{RepositoryAccess, user::BrowserSessionRepository};
10+
use mas_storage::{RepositoryAccess, RepositoryError, user::BrowserSessionRepository};
1111
use opentelemetry::{
1212
Key, KeyValue,
13-
metrics::{Counter, Histogram},
13+
metrics::{Counter, Gauge, Histogram},
1414
};
1515
use sqlx::PgPool;
1616
use tokio_util::sync::CancellationToken;
@@ -25,8 +25,8 @@ use crate::{
2525
/// database automatically.
2626
///
2727
/// The [`ActivityRecord`] structure plus the key in the [`HashMap`] takes less
28-
/// than 100 bytes, so this should allocate around a megabyte of memory.
29-
static MAX_PENDING_RECORDS: usize = 10_000;
28+
/// than 100 bytes, so this should allocate around 100kB of memory.
29+
static MAX_PENDING_RECORDS: usize = 1000;
3030

3131
const TYPE: Key = Key::from_static_str("type");
3232
const SESSION_KIND: Key = Key::from_static_str("session_kind");
@@ -45,6 +45,7 @@ struct ActivityRecord {
4545
pub struct Worker {
4646
pool: PgPool,
4747
pending_records: HashMap<(SessionKind, Ulid), ActivityRecord>,
48+
pending_records_gauge: Gauge<u64>,
4849
message_counter: Counter<u64>,
4950
flush_time_histogram: Histogram<u64>,
5051
}
@@ -80,9 +81,17 @@ impl Worker {
8081
.with_unit("ms")
8182
.build();
8283

84+
let pending_records_gauge = METER
85+
.u64_gauge("mas.activity_tracker.pending_records")
86+
.with_description("The number of pending activity records")
87+
.with_unit("{records}")
88+
.build();
89+
pending_records_gauge.record(0, &[]);
90+
8391
Self {
8492
pool,
8593
pending_records: HashMap::with_capacity(MAX_PENDING_RECORDS),
94+
pending_records_gauge,
8695
message_counter,
8796
flush_time_histogram,
8897
}
@@ -165,6 +174,10 @@ impl Worker {
165174
let _ = tx.send(());
166175
}
167176
}
177+
178+
// Update the gauge
179+
self.pending_records_gauge
180+
.record(self.pending_records.len() as u64, &[]);
168181
}
169182

170183
// Flush one last time
@@ -193,18 +206,22 @@ impl Worker {
193206
Err(e) => {
194207
self.flush_time_histogram
195208
.record(duration_ms, &[KeyValue::new(RESULT, "failure")]);
196-
tracing::error!("Failed to flush activity tracker: {}", e);
209+
tracing::error!(
210+
error = &e as &dyn std::error::Error,
211+
"Failed to flush activity tracker"
212+
);
197213
}
198214
}
199215
}
200216

201217
/// Fallible part of [`Self::flush`].
202218
#[tracing::instrument(name = "activity_tracker.flush", skip(self))]
203-
async fn try_flush(&mut self) -> Result<(), anyhow::Error> {
219+
async fn try_flush(&mut self) -> Result<(), RepositoryError> {
204220
let pending_records = &self.pending_records;
205221

206222
let mut repo = mas_storage_pg::PgRepository::from_pool(&self.pool)
207-
.await?
223+
.await
224+
.map_err(RepositoryError::from_error)?
208225
.boxed();
209226

210227
let mut browser_sessions = Vec::new();

crates/storage-pg/src/compat/session.rs

+9-6
Original file line numberDiff line numberDiff line change
@@ -549,13 +549,16 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> {
549549
)]
550550
async fn record_batch_activity(
551551
&mut self,
552-
activity: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
552+
mut activities: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
553553
) -> Result<(), Self::Error> {
554-
let mut ids = Vec::with_capacity(activity.len());
555-
let mut last_activities = Vec::with_capacity(activity.len());
556-
let mut ips = Vec::with_capacity(activity.len());
557-
558-
for (id, last_activity, ip) in activity {
554+
// Sort the activity by ID, so that when batching the updates, Postgres
555+
// locks the rows in a stable order, preventing deadlocks
556+
activities.sort_unstable();
557+
let mut ids = Vec::with_capacity(activities.len());
558+
let mut last_activities = Vec::with_capacity(activities.len());
559+
let mut ips = Vec::with_capacity(activities.len());
560+
561+
for (id, last_activity, ip) in activities {
559562
ids.push(Uuid::from(id));
560563
last_activities.push(last_activity);
561564
ips.push(ip);

crates/storage-pg/src/oauth2/session.rs

+9-6
Original file line numberDiff line numberDiff line change
@@ -445,13 +445,16 @@ impl OAuth2SessionRepository for PgOAuth2SessionRepository<'_> {
445445
)]
446446
async fn record_batch_activity(
447447
&mut self,
448-
activity: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
448+
mut activities: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
449449
) -> Result<(), Self::Error> {
450-
let mut ids = Vec::with_capacity(activity.len());
451-
let mut last_activities = Vec::with_capacity(activity.len());
452-
let mut ips = Vec::with_capacity(activity.len());
453-
454-
for (id, last_activity, ip) in activity {
450+
// Sort the activity by ID, so that when batching the updates, Postgres
451+
// locks the rows in a stable order, preventing deadlocks
452+
activities.sort_unstable();
453+
let mut ids = Vec::with_capacity(activities.len());
454+
let mut last_activities = Vec::with_capacity(activities.len());
455+
let mut ips = Vec::with_capacity(activities.len());
456+
457+
for (id, last_activity, ip) in activities {
455458
ids.push(Uuid::from(id));
456459
last_activities.push(last_activity);
457460
ips.push(ip);

crates/storage-pg/src/user/session.rs

+9-6
Original file line numberDiff line numberDiff line change
@@ -564,13 +564,16 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> {
564564
)]
565565
async fn record_batch_activity(
566566
&mut self,
567-
activity: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
567+
mut activities: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
568568
) -> Result<(), Self::Error> {
569-
let mut ids = Vec::with_capacity(activity.len());
570-
let mut last_activities = Vec::with_capacity(activity.len());
571-
let mut ips = Vec::with_capacity(activity.len());
572-
573-
for (id, last_activity, ip) in activity {
569+
// Sort the activity by ID, so that when batching the updates, Postgres
570+
// locks the rows in a stable order, preventing deadlocks
571+
activities.sort_unstable();
572+
let mut ids = Vec::with_capacity(activities.len());
573+
let mut last_activities = Vec::with_capacity(activities.len());
574+
let mut ips = Vec::with_capacity(activities.len());
575+
576+
for (id, last_activity, ip) in activities {
574577
ids.push(Uuid::from(id));
575578
last_activities.push(last_activity);
576579
ips.push(ip);

0 commit comments

Comments
 (0)