Skip to content

Commit 9b0d697

Browse files
committed
feat: add json output to stderr of upgrade and switch
1 parent 201d358 commit 9b0d697

File tree

3 files changed

+95
-9
lines changed

3 files changed

+95
-9
lines changed

lib/src/cli.rs

+11-3
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ pub(crate) struct UpgradeOpts {
5252
/// a userspace-only restart.
5353
#[clap(long, conflicts_with = "check")]
5454
pub(crate) apply: bool,
55+
56+
/// Pipe download progress to stderr in a jsonl format.
57+
#[clap(long)]
58+
pub(crate) json: bool,
5559
}
5660

5761
/// Perform an switch operation
@@ -101,6 +105,10 @@ pub(crate) struct SwitchOpts {
101105

102106
/// Target image to use for the next boot.
103107
pub(crate) target: String,
108+
109+
/// Pipe download progress to stderr in a jsonl format.
110+
#[clap(long)]
111+
pub(crate) json: bool,
104112
}
105113

106114
/// Options controlling rollback
@@ -670,7 +678,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
670678
}
671679
}
672680
} else {
673-
let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet).await?;
681+
let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, opts.json).await?;
674682
let staged_digest = staged_image.map(|s| s.digest().expect("valid digest in status"));
675683
let fetched_digest = &fetched.manifest_digest;
676684
tracing::debug!("staged: {staged_digest:?}");
@@ -764,7 +772,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
764772
}
765773
let new_spec = RequiredHostSpec::from_spec(&new_spec)?;
766774

767-
let fetched = crate::deploy::pull(repo, &target, None, opts.quiet).await?;
775+
let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, opts.json).await?;
768776

769777
if !opts.retain {
770778
// By default, we prune the previous ostree ref so it will go away after later upgrades
@@ -826,7 +834,7 @@ async fn edit(opts: EditOpts) -> Result<()> {
826834
return crate::deploy::rollback(sysroot).await;
827835
}
828836

829-
let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet).await?;
837+
let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, false).await?;
830838

831839
// TODO gc old layers here
832840

lib/src/deploy.rs

+83-5
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ pub(crate) struct ImageState {
4545
pub(crate) ostree_commit: String,
4646
}
4747

48+
/// Download information
49+
#[derive(Debug,serde::Serialize)]
50+
pub struct JsonProgress {
51+
pub done_bytes: u64,
52+
pub download_bytes: u64,
53+
pub image_bytes: u64,
54+
pub n_layers: usize,
55+
pub n_layers_done: usize,
56+
}
57+
4858
impl<'a> RequiredHostSpec<'a> {
4959
/// Given a (borrowed) host specification, "unwrap" its internal
5060
/// options, giving a spec that is required to have a base container image.
@@ -234,13 +244,73 @@ async fn handle_layer_progress_print(
234244
}
235245
}
236246

247+
/// Write container fetch progress to standard output.
248+
async fn handle_layer_progress_print_jsonl(
249+
mut layers: tokio::sync::mpsc::Receiver<ostree_container::store::ImportProgress>,
250+
mut layer_bytes: tokio::sync::watch::Receiver<Option<ostree_container::store::LayerProgress>>,
251+
n_layers_to_fetch: usize,
252+
download_bytes: u64,
253+
image_bytes: u64,
254+
) {
255+
let mut total_read = 0u64;
256+
let mut layers_done: usize = 0;
257+
let mut last_json_written = std::time::Instant::now();
258+
loop {
259+
tokio::select! {
260+
// Always handle layer changes first.
261+
biased;
262+
layer = layers.recv() => {
263+
if let Some(l) = layer {
264+
if !l.is_starting() {
265+
let layer = descriptor_of_progress(&l);
266+
layers_done += 1;
267+
total_read += total_read.saturating_add(layer.size());
268+
}
269+
} else {
270+
// If the receiver is disconnected, then we're done
271+
break
272+
};
273+
},
274+
r = layer_bytes.changed() => {
275+
if r.is_err() {
276+
// If the receiver is disconnected, then we're done
277+
break
278+
}
279+
let bytes = layer_bytes.borrow();
280+
if let Some(bytes) = &*bytes {
281+
let done_bytes = total_read + bytes.fetched;
282+
283+
// Lets update the json output only on bytes fetched
284+
// They are common enough, anyhow. Debounce on time.
285+
let curr = std::time::Instant::now();
286+
if curr.duration_since(last_json_written).as_secs_f64() > 0.2 {
287+
let json = JsonProgress {
288+
done_bytes,
289+
download_bytes,
290+
image_bytes,
291+
n_layers: n_layers_to_fetch,
292+
n_layers_done: layers_done,
293+
};
294+
let json = serde_json::to_string(&json).unwrap();
295+
// Write to stderr so that consumer can filter this
296+
eprintln!("{}", json);
297+
last_json_written = curr;
298+
}
299+
}
300+
}
301+
}
302+
}
303+
}
304+
305+
237306
/// Wrapper for pulling a container image, wiring up status output.
238307
#[context("Pulling")]
239308
pub(crate) async fn pull(
240309
repo: &ostree::Repo,
241310
imgref: &ImageReference,
242311
target_imgref: Option<&OstreeImageReference>,
243312
quiet: bool,
313+
json: bool,
244314
) -> Result<Box<ImageState>> {
245315
let ostree_imgref = &OstreeImageReference::from(imgref.clone());
246316
let mut imp = new_importer(repo, ostree_imgref).await?;
@@ -262,14 +332,22 @@ pub(crate) async fn pull(
262332
let layers_to_fetch = prep.layers_to_fetch().collect::<Result<Vec<_>>>()?;
263333
let n_layers_to_fetch = layers_to_fetch.len();
264334
let download_bytes: u64 = layers_to_fetch.iter().map(|(l, _)| l.layer.size()).sum();
335+
let image_bytes: u64 = prep.all_layers().map(|l| l.layer.size()).sum();
265336

266-
let printer = (!quiet).then(|| {
337+
let printer = (!quiet || json).then(|| {
267338
let layer_progress = imp.request_progress();
268339
let layer_byte_progress = imp.request_layer_progress();
269-
tokio::task::spawn(async move {
270-
handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes)
271-
.await
272-
})
340+
if json {
341+
tokio::task::spawn(async move {
342+
handle_layer_progress_print_jsonl(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes, image_bytes)
343+
.await
344+
})
345+
} else {
346+
tokio::task::spawn(async move {
347+
handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes)
348+
.await
349+
})
350+
}
273351
});
274352
let import = imp.import(prep).await;
275353
if let Some(printer) = printer {

lib/src/install.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -744,7 +744,7 @@ async fn install_container(
744744
let spec_imgref = ImageReference::from(src_imageref.clone());
745745
let repo = &sysroot.repo();
746746
repo.set_disable_fsync(true);
747-
let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false).await?;
747+
let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false, false).await?;
748748
repo.set_disable_fsync(false);
749749
r
750750
};

0 commit comments

Comments
 (0)