Skip to content

Commit 3451362

Browse files
committed
make progress writer multi-thread safe, add deploy stages
Signed-off-by: Antheas Kapenekakis <[email protected]>
1 parent c5e5d95 commit 3451362

File tree

3 files changed

+64
-32
lines changed

3 files changed

+64
-32
lines changed

lib/src/cli.rs

+8-13
Original file line numberDiff line numberDiff line change
@@ -686,7 +686,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
686686
}
687687
}
688688
} else {
689-
let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, prog).await?;
689+
let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, prog.clone()).await?;
690690
let staged_digest = staged_image.map(|s| s.digest().expect("valid digest in status"));
691691
let fetched_digest = &fetched.manifest_digest;
692692
tracing::debug!("staged: {staged_digest:?}");
@@ -709,7 +709,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
709709
println!("No update available.")
710710
} else {
711711
let osname = booted_deployment.osname();
712-
crate::deploy::stage(sysroot, &osname, &fetched, &spec).await?;
712+
crate::deploy::stage(sysroot, &osname, &fetched, &spec, prog.clone()).await?;
713713
changed = true;
714714
if let Some(prev) = booted_image.as_ref() {
715715
if let Some(fetched_manifest) = fetched.get_manifest(repo)? {
@@ -784,7 +784,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
784784
}
785785
let new_spec = RequiredHostSpec::from_spec(&new_spec)?;
786786

787-
let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, prog).await?;
787+
let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, prog.clone()).await?;
788788

789789
if !opts.retain {
790790
// By default, we prune the previous ostree ref so it will go away after later upgrades
@@ -798,7 +798,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
798798
}
799799

800800
let stateroot = booted_deployment.osname();
801-
crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec).await?;
801+
crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec, prog).await?;
802802

803803
if opts.apply {
804804
crate::reboot::reboot()?;
@@ -840,25 +840,20 @@ async fn edit(opts: EditOpts) -> Result<()> {
840840
host.spec.verify_transition(&new_host.spec)?;
841841
let new_spec = RequiredHostSpec::from_spec(&new_host.spec)?;
842842

843+
let prog = ProgressWriter::from_empty();
844+
843845
// We only support two state transitions right now; switching the image,
844846
// or flipping the bootloader ordering.
845847
if host.spec.boot_order != new_host.spec.boot_order {
846848
return crate::deploy::rollback(sysroot).await;
847849
}
848850

849-
let fetched = crate::deploy::pull(
850-
repo,
851-
new_spec.image,
852-
None,
853-
opts.quiet,
854-
ProgressWriter::from_empty(),
855-
)
856-
.await?;
851+
let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, prog.clone()).await?;
857852

858853
// TODO gc old layers here
859854

860855
let stateroot = booted_deployment.osname();
861-
crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec).await?;
856+
crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec, prog).await?;
862857

863858
Ok(())
864859
}

lib/src/deploy.rs

+23-7
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ async fn handle_layer_progress_print(
146146
layers_total: usize,
147147
bytes_download: u64,
148148
bytes_total: u64,
149-
mut prog: ProgressWriter,
149+
prog: ProgressWriter,
150150
) {
151151
let start = std::time::Instant::now();
152152
let mut total_read = 0u64;
@@ -219,7 +219,7 @@ async fn handle_layer_progress_print(
219219
// They are common enough, anyhow. Debounce on time.
220220
let curr = std::time::Instant::now();
221221
if curr.duration_since(last_json_written).as_secs_f64() > 0.2 {
222-
prog.send(ProgressStage::Fetching {
222+
prog.send(ProgressStage::Fetch {
223223
bytes_done,
224224
bytes_download,
225225
bytes_total,
@@ -243,14 +243,12 @@ async fn handle_layer_progress_print(
243243
let elapsed = end.duration_since(start);
244244
let persec = total_read as f64 / elapsed.as_secs_f64();
245245
let persec = indicatif::HumanBytes(persec as u64);
246-
if let Err(e) = bar.println(&format!(
246+
println!(
247247
"Fetched layers: {} in {} ({}/s)",
248248
indicatif::HumanBytes(total_read),
249249
indicatif::HumanDuration(elapsed),
250250
persec,
251-
)) {
252-
tracing::warn!("writing to stdout: {e}");
253-
}
251+
);
254252
}
255253

256254
/// Wrapper for pulling a container image, wiring up status output.
@@ -296,7 +294,7 @@ pub(crate) async fn pull(
296294
layers_total,
297295
bytes_download,
298296
bytes_total,
299-
prog,
297+
prog.clone(),
300298
)
301299
.await
302300
})
@@ -493,7 +491,15 @@ pub(crate) async fn stage(
493491
stateroot: &str,
494492
image: &ImageState,
495493
spec: &RequiredHostSpec<'_>,
494+
prog: ProgressWriter,
496495
) -> Result<()> {
496+
let n_steps = 3;
497+
498+
prog.send(ProgressStage::Deploy {
499+
n_steps,
500+
step: 0,
501+
name: "deploying".to_string(),
502+
});
497503
let merge_deployment = sysroot.merge_deployment(Some(stateroot));
498504
let origin = origin_from_imageref(spec.image)?;
499505
let deployment = crate::deploy::deploy(
@@ -505,8 +511,18 @@ pub(crate) async fn stage(
505511
)
506512
.await?;
507513

514+
prog.send(ProgressStage::Deploy {
515+
n_steps,
516+
step: 1,
517+
name: "pulling_bound_images".to_string(),
518+
});
508519
crate::boundimage::pull_bound_images(sysroot, &deployment).await?;
509520

521+
prog.send(ProgressStage::Deploy {
522+
n_steps,
523+
step: 2,
524+
name: "cleaning_up".to_string(),
525+
});
510526
crate::deploy::cleanup(sysroot).await?;
511527
println!("Queued for next boot: {:#}", spec.image);
512528
if let Some(version) = image.version.as_deref() {

lib/src/progress_jsonl.rs

+33-12
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
44
use std::fs;
55
use std::io::{BufWriter, Write};
6+
use std::mem;
67
use std::os::fd::{FromRawFd, RawFd};
8+
use std::sync::{Arc, Mutex};
79

810
use anyhow::Result;
911
use serde::Serialize;
@@ -20,8 +22,8 @@ pub struct LayerState {
2022
#[derive(Debug, serde::Serialize, serde::Deserialize)]
2123
#[serde(tag = "stage")]
2224
pub enum ProgressStage {
23-
#[serde(rename = "fetching")]
24-
Fetching {
25+
#[serde(rename = "fetch")]
26+
Fetch {
2527
bytes_done: u64,
2628
bytes_download: u64,
2729
bytes_total: u64,
@@ -30,16 +32,23 @@ pub enum ProgressStage {
3032
layers_total: usize,
3133
layers: Option<Vec<LayerState>>,
3234
},
35+
#[serde(rename = "deploy")]
36+
Deploy {
37+
n_steps: usize,
38+
step: usize,
39+
name: String,
40+
},
3341
}
3442

43+
#[derive(Clone)]
3544
pub(crate) struct ProgressWriter {
36-
fd: Option<BufWriter<fs::File>>,
45+
fd: Arc<Mutex<Option<BufWriter<fs::File>>>>,
3746
}
3847

3948
impl From<fs::File> for ProgressWriter {
4049
fn from(value: fs::File) -> Self {
4150
Self {
42-
fd: Some(BufWriter::new(value)),
51+
fd: Arc::new(Mutex::new(Some(BufWriter::new(value)))),
4352
}
4453
}
4554
}
@@ -52,15 +61,21 @@ impl ProgressWriter {
5261
}
5362

5463
pub(crate) fn from_empty() -> Self {
55-
Self { fd: None }
64+
Self {
65+
fd: Arc::new(Mutex::new(None)),
66+
}
5667
}
5768

5869
/// Serialize the target object to JSON as a single line
59-
pub(crate) fn send_unchecked<T: Serialize>(&mut self, v: T) -> Result<()> {
60-
if self.fd.is_none() {
70+
pub(crate) fn send_unchecked<T: Serialize>(&self, v: T) -> Result<()> {
71+
let arc = self.fd.clone();
72+
let mut mutex = arc.lock().expect("Could not lock mutex");
73+
let fd_opt = mutex.as_mut();
74+
75+
if fd_opt.is_none() {
6176
return Ok(());
6277
}
63-
let mut fd = self.fd.as_mut().unwrap();
78+
let mut fd = fd_opt.unwrap();
6479

6580
// serde is guaranteed not to output newlines here
6681
serde_json::to_writer(&mut fd, &v)?;
@@ -71,18 +86,24 @@ impl ProgressWriter {
7186
Ok(())
7287
}
7388

74-
pub(crate) fn send<T: Serialize>(&mut self, v: T) {
89+
pub(crate) fn send<T: Serialize>(&self, v: T) {
7590
if let Err(e) = self.send_unchecked(v) {
7691
eprintln!("Failed to write to jsonl: {}", e);
7792
// Stop writing to fd but let process continue
78-
self.fd = None;
93+
let arc = self.fd.clone();
94+
let mut mutex = arc.lock().expect("Could not lock mutex");
95+
*mutex = None.into();
7996
}
8097
}
8198

8299
/// Flush remaining data and return the underlying file.
83100
#[allow(dead_code)]
84101
pub(crate) fn into_inner(self) -> Result<fs::File> {
85-
if let Some(fd) = self.fd {
102+
let arc = self.fd.clone();
103+
let mut mutex = arc.lock().expect("Could not lock mutex");
104+
let fd_opt = mem::replace(&mut *mutex, None);
105+
106+
if let Some(fd) = fd_opt {
86107
return fd.into_inner().map_err(Into::into);
87108
} else {
88109
return Err(anyhow::anyhow!("File descriptor closed/never existed."));
@@ -107,7 +128,7 @@ mod test {
107128
#[test]
108129
fn test_jsonl() -> Result<()> {
109130
let tf = tempfile::tempfile()?;
110-
let mut w = ProgressWriter::from(tf);
131+
let w = ProgressWriter::from(tf);
111132
let testvalues = [
112133
S {
113134
s: "foo".into(),

0 commit comments

Comments
 (0)