Skip to content

Commit 9228f20

Browse files
committed
fixup! syn2mas: Add progress reporting to log and to opentelemetry metrics
Add metrics directly within syn2mas, no background thread
1 parent ebad8a7 commit 9228f20

File tree

7 files changed

+232
-59
lines changed

7 files changed

+232
-59
lines changed

Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/cli/src/commands/syn2mas.rs

+1-36
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,14 @@ use mas_config::{
1010
};
1111
use mas_storage::SystemClock;
1212
use mas_storage_pg::MIGRATOR;
13-
use opentelemetry::KeyValue;
1413
use rand::thread_rng;
1514
use sqlx::{Connection, Either, PgConnection, postgres::PgConnectOptions, types::Uuid};
1615
use syn2mas::{
1716
LockedMasDatabase, MasWriter, Progress, ProgressStage, SynapseReader, synapse_config,
1817
};
1918
use tracing::{Instrument, error, info, info_span, warn};
2019

21-
use crate::{
22-
telemetry::METER,
23-
util::{DatabaseConnectOptions, database_connection_from_config_with_options},
24-
};
20+
use crate::util::{DatabaseConnectOptions, database_connection_from_config_with_options};
2521

2622
/// The exit code used by `syn2mas check` and `syn2mas migrate` when there are
2723
/// errors preventing migration.
@@ -258,7 +254,6 @@ impl Options {
258254

259255
let occasional_progress_logger_task =
260256
tokio::spawn(occasional_progress_logger(progress.clone()));
261-
let progress_telemetry_task = tokio::spawn(progress_telemetry(progress.clone()));
262257

263258
let mas_matrix = MatrixConfig::extract(figment)?;
264259
eprintln!("\n\n");
@@ -274,7 +269,6 @@ impl Options {
274269
.await?;
275270

276271
occasional_progress_logger_task.abort();
277-
progress_telemetry_task.abort();
278272

279273
Ok(ExitCode::SUCCESS)
280274
}
@@ -312,32 +306,3 @@ async fn occasional_progress_logger(progress: Progress) {
312306
}
313307
}
314308
}
315-
316-
/// Reports migration progress as OpenTelemetry metrics
317-
async fn progress_telemetry(progress: Progress) {
318-
let migrated_data_counter = METER
319-
.u64_gauge("migrated_data")
320-
.with_description("How many entities have been migrated so far")
321-
.build();
322-
let max_data_counter = METER
323-
.u64_gauge("max_data")
324-
.with_description("How many entities of the given type exist (approximate)")
325-
.build();
326-
327-
loop {
328-
tokio::time::sleep(Duration::from_secs(10)).await;
329-
if let ProgressStage::MigratingData {
330-
entity,
331-
migrated,
332-
approx_count,
333-
} = &**progress.get_current_stage()
334-
{
335-
let metrics_kv = [KeyValue::new("entity", *entity)];
336-
let migrated = migrated.load(Ordering::Relaxed);
337-
migrated_data_counter.record(u64::from(migrated), &metrics_kv);
338-
max_data_counter.record(*approx_count, &metrics_kv);
339-
} else {
340-
// not sure how to map other stages
341-
}
342-
}
343-
}

crates/syn2mas/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ ulid = { workspace = true, features = ["uuid"] }
3535
mas-config.workspace = true
3636
mas-storage.workspace = true
3737

38+
opentelemetry.workspace = true
39+
opentelemetry-semantic-conventions.workspace = true
40+
3841
[dev-dependencies]
3942
mas-storage-pg.workspace = true
4043

crates/syn2mas/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod synapse_reader;
88

99
mod migration;
1010
mod progress;
11+
mod telemetry;
1112

1213
type RandomState = rustc_hash::FxBuildHasher;
1314
type HashMap<K, V> = rustc_hash::FxHashMap<K, V>;

0 commit comments

Comments
 (0)