Skip to content

Commit ebad8a7

Browse files
committed
syn2mas: Add progress reporting to log and to opentelemetry metrics
1 parent 0afeb89 commit ebad8a7

File tree

4 files changed

+214
-12
lines changed

4 files changed

+214
-12
lines changed

crates/cli/src/commands/syn2mas.rs

+80-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashMap, process::ExitCode};
1+
use std::{collections::HashMap, process::ExitCode, sync::atomic::Ordering, time::Duration};
22

33
use anyhow::Context;
44
use camino::Utf8PathBuf;
@@ -10,12 +10,18 @@ use mas_config::{
1010
};
1111
use mas_storage::SystemClock;
1212
use mas_storage_pg::MIGRATOR;
13+
use opentelemetry::KeyValue;
1314
use rand::thread_rng;
1415
use sqlx::{Connection, Either, PgConnection, postgres::PgConnectOptions, types::Uuid};
15-
use syn2mas::{LockedMasDatabase, MasWriter, SynapseReader, synapse_config};
16-
use tracing::{Instrument, error, info_span, warn};
16+
use syn2mas::{
17+
LockedMasDatabase, MasWriter, Progress, ProgressStage, SynapseReader, synapse_config,
18+
};
19+
use tracing::{Instrument, error, info, info_span, warn};
1720

18-
use crate::util::{DatabaseConnectOptions, database_connection_from_config_with_options};
21+
use crate::{
22+
telemetry::METER,
23+
util::{DatabaseConnectOptions, database_connection_from_config_with_options},
24+
};
1925

2026
/// The exit code used by `syn2mas check` and `syn2mas migrate` when there are
2127
/// errors preventing migration.
@@ -248,7 +254,12 @@ impl Options {
248254
#[allow(clippy::disallowed_methods)]
249255
let mut rng = thread_rng();
250256

251-
// TODO progress reporting
257+
let progress = Progress::default();
258+
259+
let occasional_progress_logger_task =
260+
tokio::spawn(occasional_progress_logger(progress.clone()));
261+
let progress_telemetry_task = tokio::spawn(progress_telemetry(progress.clone()));
262+
252263
let mas_matrix = MatrixConfig::extract(figment)?;
253264
eprintln!("\n\n");
254265
syn2mas::migrate(
@@ -258,11 +269,75 @@ impl Options {
258269
&clock,
259270
&mut rng,
260271
provider_id_mappings,
272+
&progress,
261273
)
262274
.await?;
263275

276+
occasional_progress_logger_task.abort();
277+
progress_telemetry_task.abort();
278+
264279
Ok(ExitCode::SUCCESS)
265280
}
266281
}
267282
}
268283
}
284+
285+
/// Logs progress every 30 seconds, as a lightweight alternative to a progress
286+
/// bar. For most deployments, the migration will not take 30 seconds so this
287+
/// will not be relevant. In other cases, this will give the operator an idea of
288+
/// what's going on.
289+
async fn occasional_progress_logger(progress: Progress) {
290+
loop {
291+
tokio::time::sleep(Duration::from_secs(30)).await;
292+
match &**progress.get_current_stage() {
293+
ProgressStage::SettingUp => {
294+
info!(name: "progress", "still setting up");
295+
}
296+
ProgressStage::MigratingData {
297+
entity,
298+
migrated,
299+
approx_count,
300+
} => {
301+
let migrated = migrated.load(Ordering::Relaxed);
302+
#[allow(clippy::cast_precision_loss)]
303+
let percent = (f64::from(migrated) / *approx_count as f64) * 100.0;
304+
info!(name: "progress", "migrating {entity}: {migrated}/~{approx_count} (~{percent:.1}%)");
305+
}
306+
ProgressStage::RebuildIndex { index_name } => {
307+
info!(name: "progress", "still waiting for rebuild of index {index_name}");
308+
}
309+
ProgressStage::RebuildConstraint { constraint_name } => {
310+
info!(name: "progress", "still waiting for rebuild of constraint {constraint_name}");
311+
}
312+
}
313+
}
314+
}
315+
316+
/// Reports migration progress as OpenTelemetry metrics
317+
async fn progress_telemetry(progress: Progress) {
318+
let migrated_data_counter = METER
319+
.u64_gauge("migrated_data")
320+
.with_description("How many entities have been migrated so far")
321+
.build();
322+
let max_data_counter = METER
323+
.u64_gauge("max_data")
324+
.with_description("How many entities of the given type exist (approximate)")
325+
.build();
326+
327+
loop {
328+
tokio::time::sleep(Duration::from_secs(10)).await;
329+
if let ProgressStage::MigratingData {
330+
entity,
331+
migrated,
332+
approx_count,
333+
} = &**progress.get_current_stage()
334+
{
335+
let metrics_kv = [KeyValue::new("entity", *entity)];
336+
let migrated = migrated.load(Ordering::Relaxed);
337+
migrated_data_counter.record(u64::from(migrated), &metrics_kv);
338+
max_data_counter.record(*approx_count, &metrics_kv);
339+
} else {
340+
// not sure how to map other stages
341+
}
342+
}
343+
}

crates/syn2mas/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ mod mas_writer;
77
mod synapse_reader;
88

99
mod migration;
10+
mod progress;
1011

1112
type RandomState = rustc_hash::FxBuildHasher;
1213
type HashMap<K, V> = rustc_hash::FxHashMap<K, V>;
1314

1415
pub use self::{
1516
mas_writer::{MasWriter, checks::mas_pre_migration_checks, locking::LockedMasDatabase},
1617
migration::migrate,
18+
progress::{Progress, ProgressStage},
1719
synapse_reader::{
1820
SynapseReader,
1921
checks::{

crates/syn2mas/src/migration.rs

+78-7
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,14 @@
1111
//! This module does not implement any of the safety checks that should be run
1212
//! *before* the migration.
1313
14-
use std::{pin::pin, time::Instant};
14+
use std::{
15+
pin::pin,
16+
sync::{
17+
Arc,
18+
atomic::{AtomicU32, Ordering},
19+
},
20+
time::Instant,
21+
};
1522

1623
use chrono::{DateTime, Utc};
1724
use compact_str::CompactString;
@@ -32,6 +39,7 @@ use crate::{
3239
MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
3340
MasNewUserPassword, MasWriteBuffer, MasWriter,
3441
},
42+
progress::{Progress, ProgressStage},
3543
synapse_reader::{
3644
self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice,
3745
SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser,
@@ -147,6 +155,7 @@ pub async fn migrate(
147155
clock: &dyn Clock,
148156
rng: &mut impl RngCore,
149157
provider_id_mapping: std::collections::HashMap<String, Uuid>,
158+
progress: &Progress,
150159
) -> Result<(), Error> {
151160
let counts = synapse.count_rows().await.into_synapse("counting users")?;
152161

@@ -162,14 +171,58 @@ pub async fn migrate(
162171
provider_id_mapping,
163172
};
164173

165-
let (mas, state) = migrate_users(&mut synapse, mas, state, rng).await?;
166-
let (mas, state) = migrate_threepids(&mut synapse, mas, rng, state).await?;
167-
let (mas, state) = migrate_external_ids(&mut synapse, mas, rng, state).await?;
174+
let migrated_counter = Arc::new(AtomicU32::new(0));
175+
progress.set_current_stage(ProgressStage::MigratingData {
176+
entity: "users",
177+
migrated: migrated_counter.clone(),
178+
approx_count: counts.users as u64,
179+
});
180+
let (mas, state) = migrate_users(&mut synapse, mas, state, rng, migrated_counter).await?;
181+
182+
let migrated_counter = Arc::new(AtomicU32::new(0));
183+
progress.set_current_stage(ProgressStage::MigratingData {
184+
entity: "threepids",
185+
migrated: migrated_counter.clone(),
186+
approx_count: counts.users as u64,
187+
});
188+
let (mas, state) = migrate_threepids(&mut synapse, mas, rng, state, &migrated_counter).await?;
189+
190+
let migrated_counter = Arc::new(AtomicU32::new(0));
191+
progress.set_current_stage(ProgressStage::MigratingData {
192+
entity: "external_ids",
193+
migrated: migrated_counter.clone(),
194+
approx_count: counts.users as u64,
195+
});
196+
let (mas, state) =
197+
migrate_external_ids(&mut synapse, mas, rng, state, &migrated_counter).await?;
198+
199+
let migrated_counter = Arc::new(AtomicU32::new(0));
200+
progress.set_current_stage(ProgressStage::MigratingData {
201+
entity: "unrefreshable_access_tokens",
202+
migrated: migrated_counter.clone(),
203+
approx_count: counts.users as u64,
204+
});
168205
let (mas, state) =
169-
migrate_unrefreshable_access_tokens(&mut synapse, mas, clock, rng, state).await?;
206+
migrate_unrefreshable_access_tokens(&mut synapse, mas, clock, rng, state, migrated_counter)
207+
.await?;
208+
209+
let migrated_counter = Arc::new(AtomicU32::new(0));
210+
progress.set_current_stage(ProgressStage::MigratingData {
211+
entity: "refreshable_token_pairs",
212+
migrated: migrated_counter.clone(),
213+
approx_count: counts.users as u64,
214+
});
170215
let (mas, state) =
171-
migrate_refreshable_token_pairs(&mut synapse, mas, clock, rng, state).await?;
172-
let (mas, _state) = migrate_devices(&mut synapse, mas, rng, state).await?;
216+
migrate_refreshable_token_pairs(&mut synapse, mas, clock, rng, state, &migrated_counter)
217+
.await?;
218+
219+
let migrated_counter = Arc::new(AtomicU32::new(0));
220+
progress.set_current_stage(ProgressStage::MigratingData {
221+
entity: "devices",
222+
migrated: migrated_counter.clone(),
223+
approx_count: counts.users as u64,
224+
});
225+
let (mas, _state) = migrate_devices(&mut synapse, mas, rng, state, migrated_counter).await?;
173226

174227
synapse
175228
.finish()
@@ -189,6 +242,7 @@ async fn migrate_users(
189242
mut mas: MasWriter,
190243
mut state: MigrationState,
191244
rng: &mut impl RngCore,
245+
progress_counter: Arc<AtomicU32>,
192246
) -> Result<(MasWriter, MigrationState), Error> {
193247
let start = Instant::now();
194248

@@ -261,6 +315,8 @@ async fn migrate_users(
261315
.await
262316
.into_mas("writing password")?;
263317
}
318+
319+
progress_counter.fetch_add(1, Ordering::Relaxed);
264320
}
265321

266322
user_buffer
@@ -304,6 +360,7 @@ async fn migrate_threepids(
304360
mut mas: MasWriter,
305361
rng: &mut impl RngCore,
306362
state: MigrationState,
363+
progress_counter: &AtomicU32,
307364
) -> Result<(MasWriter, MigrationState), Error> {
308365
let start = Instant::now();
309366

@@ -365,6 +422,8 @@ async fn migrate_threepids(
365422
.await
366423
.into_mas("writing unsupported threepid")?;
367424
}
425+
426+
progress_counter.fetch_add(1, Ordering::Relaxed);
368427
}
369428

370429
email_buffer
@@ -394,6 +453,7 @@ async fn migrate_external_ids(
394453
mut mas: MasWriter,
395454
rng: &mut impl RngCore,
396455
state: MigrationState,
456+
progress_counter: &AtomicU32,
397457
) -> Result<(MasWriter, MigrationState), Error> {
398458
let start = Instant::now();
399459

@@ -447,6 +507,8 @@ async fn migrate_external_ids(
447507
)
448508
.await
449509
.into_mas("failed to write upstream link")?;
510+
511+
progress_counter.fetch_add(1, Ordering::Relaxed);
450512
}
451513

452514
write_buffer
@@ -476,6 +538,7 @@ async fn migrate_devices(
476538
mut mas: MasWriter,
477539
rng: &mut impl RngCore,
478540
mut state: MigrationState,
541+
progress_counter: Arc<AtomicU32>,
479542
) -> Result<(MasWriter, MigrationState), Error> {
480543
let start = Instant::now();
481544

@@ -563,6 +626,8 @@ async fn migrate_devices(
563626
)
564627
.await
565628
.into_mas("writing compat sessions")?;
629+
630+
progress_counter.fetch_add(1, Ordering::Relaxed);
566631
}
567632

568633
write_buffer
@@ -605,6 +670,7 @@ async fn migrate_unrefreshable_access_tokens(
605670
clock: &dyn Clock,
606671
rng: &mut impl RngCore,
607672
mut state: MigrationState,
673+
progress_counter: Arc<AtomicU32>,
608674
) -> Result<(MasWriter, MigrationState), Error> {
609675
let start = Instant::now();
610676

@@ -704,6 +770,8 @@ async fn migrate_unrefreshable_access_tokens(
704770
)
705771
.await
706772
.into_mas("writing compat access tokens")?;
773+
774+
progress_counter.fetch_add(1, Ordering::Relaxed);
707775
}
708776
write_buffer
709777
.finish(&mut mas)
@@ -749,6 +817,7 @@ async fn migrate_refreshable_token_pairs(
749817
clock: &dyn Clock,
750818
rng: &mut impl RngCore,
751819
mut state: MigrationState,
820+
progress_counter: &AtomicU32,
752821
) -> Result<(MasWriter, MigrationState), Error> {
753822
let start = Instant::now();
754823

@@ -830,6 +899,8 @@ async fn migrate_refreshable_token_pairs(
830899
)
831900
.await
832901
.into_mas("writing compat refresh tokens")?;
902+
903+
progress_counter.fetch_add(1, Ordering::Relaxed);
833904
}
834905

835906
access_token_write_buffer

crates/syn2mas/src/progress.rs

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use std::sync::{Arc, atomic::AtomicU32};
2+
3+
use arc_swap::ArcSwap;
4+
5+
/// Tracker for the progress of the migration
6+
///
7+
/// Cloning this struct intuitively gives a 'handle' to the same counters,
8+
/// which means it can be shared between tasks/threads.
9+
#[derive(Clone)]
10+
pub struct Progress {
11+
current_stage: Arc<ArcSwap<ProgressStage>>,
12+
}
13+
14+
impl Progress {
15+
/// Sets the current stage of progress.
16+
///
17+
/// This is probably not cheap enough to use for every individual row,
18+
/// so use of atomic integers for the fields that will be updated is
19+
/// recommended.
20+
#[inline]
21+
pub fn set_current_stage(&self, stage: ProgressStage) {
22+
self.current_stage.store(Arc::new(stage));
23+
}
24+
25+
/// Returns the current stage of progress.
26+
#[inline]
27+
#[must_use]
28+
pub fn get_current_stage(&self) -> arc_swap::Guard<Arc<ProgressStage>> {
29+
self.current_stage.load()
30+
}
31+
}
32+
33+
impl Default for Progress {
34+
fn default() -> Self {
35+
Self {
36+
current_stage: Arc::new(ArcSwap::new(Arc::new(ProgressStage::SettingUp))),
37+
}
38+
}
39+
}
40+
41+
pub enum ProgressStage {
42+
SettingUp,
43+
MigratingData {
44+
entity: &'static str,
45+
migrated: Arc<AtomicU32>,
46+
approx_count: u64,
47+
},
48+
RebuildIndex {
49+
index_name: String,
50+
},
51+
RebuildConstraint {
52+
constraint_name: String,
53+
},
54+
}

0 commit comments

Comments
 (0)