diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 59f67335..93ff35a5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -8,8 +8,6 @@ on: jobs: build: runs-on: ubuntu-latest - strategy: - fail-fast: false steps: - uses: actions/checkout@v4.2.2 - uses: engineerd/configurator@v0.0.10 diff --git a/Cargo.toml b/Cargo.toml index a627684f..576a405a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,12 +24,18 @@ version = "0.14.0" [badges] maintenance = { status = "actively-developed" } +[[example]] +name = "blocking-get-manifest" +path = "examples/blocking-get-manifest/main.rs" +required-features = ["blocking"] + [features] -default = ["native-tls", "test-registry"] +default = ["native-tls"] native-tls = ["reqwest/native-tls"] rustls-tls = ["reqwest/rustls-tls"] rustls-tls-native-roots = ["reqwest/rustls-tls-native-roots"] trust-dns = ["reqwest/trust-dns"] +blocking = ["reqwest/blocking"] # This features is used by tests that use docker to create a registry test-registry = [] @@ -70,6 +76,6 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } tempfile = "3.3" # This should stay pinned here until testcontainers makes sure all of its deps using rustls are # using the ring feature. Otherwise this fails to compile on Windows -testcontainers = "0.23" +testcontainers = { version = "0.23", features = ["blocking"] } tokio = { version = "1.21", features = ["macros", "fs", "rt-multi-thread"] } tokio-util = { version = "0.7.4", features = ["compat"] } diff --git a/examples/blocking-get-manifest/main.rs b/examples/blocking-get-manifest/main.rs new file mode 100644 index 00000000..35f2462b --- /dev/null +++ b/examples/blocking-get-manifest/main.rs @@ -0,0 +1,100 @@ +use oci_client::{blocking::Client, secrets::RegistryAuth, Reference}; + +use clap::Parser; +use docker_credential::{CredentialRetrievalError, DockerCredential}; +use tracing::{debug, warn}; +use tracing_subscriber::prelude::*; +use tracing_subscriber::{fmt, EnvFilter}; + +/// Pull a WebAssembly module from a OCI container registry +#[derive(Parser, Debug)] +#[clap(author, version, about, long_about = None)] +pub(crate) struct Cli { + /// Enable verbose mode + #[clap(short, long)] + pub verbose: bool, + + /// Perform anonymous operation, by default the tool tries to reuse the docker credentials read + /// from the default docker file + #[clap(short, long)] + pub anonymous: bool, + + /// Pull image from registry using HTTP instead of HTTPS + #[clap(short, long)] + pub insecure: bool, + + /// Enable json output + #[clap(long)] + pub json: bool, + + /// Name of the image to pull + image: String, +} + +fn build_auth(reference: &Reference, cli: &Cli) -> RegistryAuth { + let server = reference + .resolve_registry() + .strip_suffix('/') + .unwrap_or_else(|| reference.resolve_registry()); + + if cli.anonymous { + return RegistryAuth::Anonymous; + } + + match docker_credential::get_credential(server) { + Err(CredentialRetrievalError::ConfigNotFound) => RegistryAuth::Anonymous, + Err(CredentialRetrievalError::NoCredentialConfigured) => RegistryAuth::Anonymous, + Err(e) => panic!("Error handling docker configuration file: {}", e), + Ok(DockerCredential::UsernamePassword(username, password)) => { + debug!("Found docker credentials"); + RegistryAuth::Basic(username, password) + } + Ok(DockerCredential::IdentityToken(_)) => { + warn!("Cannot use contents of docker config, identity token not supported. Using anonymous auth"); + RegistryAuth::Anonymous + } + } +} + +fn build_client_config(cli: &Cli) -> oci_client::blocking::ClientConfig { + let protocol = if cli.insecure { + oci_client::blocking::ClientProtocol::Http + } else { + oci_client::blocking::ClientProtocol::Https + }; + + oci_client::blocking::ClientConfig { + protocol, + ..Default::default() + } +} + +pub fn main() { + let cli = Cli::parse(); + + // setup logging + let level_filter = if cli.verbose { "debug" } else { "info" }; + let filter_layer = EnvFilter::new(level_filter); + tracing_subscriber::registry() + .with(filter_layer) + .with(fmt::layer().with_writer(std::io::stderr)) + .init(); + + let reference: Reference = cli.image.parse().expect("Not a valid image reference"); + let auth = build_auth(&reference, &cli); + + let client_config = build_client_config(&cli); + let mut client = Client::new(client_config); + + let (manifest, _) = client + .pull_manifest(&reference, &auth) + .expect("Cannot pull manifest"); + + if cli.json { + serde_json::to_writer_pretty(std::io::stdout(), &manifest) + .expect("Cannot serialize manifest to JSON"); + println!(); + } else { + println!("{}", manifest); + } +} diff --git a/justfile b/justfile index 7d33b756..4cdf0bc7 100644 --- a/justfile +++ b/justfile @@ -4,5 +4,5 @@ build +FLAGS='': test: cargo fmt --all -- --check cargo clippy --workspace - cargo test --workspace --lib --tests + cargo test --workspace --lib --tests --all-features cargo test --doc --all diff --git a/src/blocking.rs b/src/blocking.rs new file mode 100644 index 00000000..e229d1e0 --- /dev/null +++ b/src/blocking.rs @@ -0,0 +1,2830 @@ +//! OCI distribution client for fetching oci images from an OCI compliant remote store +use std::collections::HashMap; +use std::convert::TryFrom; +use std::io::Write; +use std::time::Duration; + +use http::header::RANGE; +use http::HeaderValue; +use http_auth::{parser::ChallengeParser, ChallengeRef}; +use olpc_cjson::CanonicalFormatter; +use reqwest::blocking::{RequestBuilder, Response}; +use reqwest::header::HeaderMap; +use reqwest::{NoProxy, Proxy, Url}; +use serde::Serialize; +use tracing::{debug, trace, warn}; + +pub use crate::types::*; + +use crate::digest::{digest_header_value, validate_digest, Digest, Digester}; +use crate::errors::*; +use crate::manifest::{ + ImageIndexEntry, OciImageIndex, OciImageManifest, OciManifest, Versioned, + IMAGE_MANIFEST_LIST_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_INDEX_MEDIA_TYPE, + OCI_IMAGE_MEDIA_TYPE, +}; +use crate::secrets::RegistryAuth; +use crate::secrets::*; +use crate::sha256_digest; +use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, SyncTokenCache}; +use crate::Reference; + +const MIME_TYPES_DISTRIBUTION_MANIFEST: &[&str] = &[ + IMAGE_MANIFEST_MEDIA_TYPE, + IMAGE_MANIFEST_LIST_MEDIA_TYPE, + OCI_IMAGE_MEDIA_TYPE, + OCI_IMAGE_INDEX_MEDIA_TYPE, +]; + +const PUSH_CHUNK_MAX_SIZE: usize = 4096 * 1024; + +/// Default value for `ClientConfig::max_concurrent_upload` +pub const DEFAULT_MAX_CONCURRENT_UPLOAD: usize = 16; + +/// Default value for `ClientConfig::max_concurrent_download` +pub const DEFAULT_MAX_CONCURRENT_DOWNLOAD: usize = 16; + +/// Default value for `ClientConfig:default_token_expiration_secs` +pub const DEFAULT_TOKEN_EXPIRATION_SECS: usize = 60; + +static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); + +/// The OCI client connects to an OCI registry and fetches OCI images. +/// +/// An OCI registry is a container registry that adheres to the OCI Distribution +/// specification. DockerHub is one example, as are ACR and GCR. This client +/// provides a native Rust implementation for pulling OCI images. +/// +/// Some OCI registries support completely anonymous access. But most require +/// at least an Oauth2 handshake. Typically, you will want to create a new +/// client, and then run the `auth()` method, which will attempt to get +/// a read-only bearer token. From there, pulling images can be done with +/// the `pull_*` functions. +/// +/// For true anonymous access, you can skip `auth()`. This is not recommended +/// unless you are sure that the remote registry does not require Oauth2. +pub struct Client { + config: ClientConfig, + // Registry -> RegistryAuth + auth_store: HashMap, + tokens: SyncTokenCache, + client: reqwest::blocking::Client, + push_chunk_size: usize, +} + +impl Default for Client { + fn default() -> Self { + Self { + config: ClientConfig::default(), + auth_store: HashMap::new(), + tokens: SyncTokenCache::new(DEFAULT_TOKEN_EXPIRATION_SECS), + client: reqwest::blocking::Client::default(), + push_chunk_size: PUSH_CHUNK_MAX_SIZE, + } + } +} + +/// A source that can provide a `ClientConfig`. +/// If you are using this crate in your own application, you can implement this +/// trait on your configuration type so that it can be passed to `Client::from_source`. +pub trait ClientConfigSource { + /// Provides a `ClientConfig`. + fn client_config(&self) -> ClientConfig; +} + +impl TryFrom for Client { + type Error = OciDistributionError; + + fn try_from(config: ClientConfig) -> std::result::Result { + #[allow(unused_mut)] + let mut client_builder = reqwest::blocking::Client::builder(); + #[cfg(not(target_arch = "wasm32"))] + let mut client_builder = + client_builder.danger_accept_invalid_certs(config.accept_invalid_certificates); + + client_builder = match () { + #[cfg(all(feature = "native-tls", not(target_arch = "wasm32")))] + () => client_builder.danger_accept_invalid_hostnames(config.accept_invalid_hostnames), + #[cfg(any(not(feature = "native-tls"), target_arch = "wasm32"))] + () => client_builder, + }; + + #[cfg(not(target_arch = "wasm32"))] + for c in &config.extra_root_certificates { + let cert = match c.encoding { + CertificateEncoding::Der => reqwest::Certificate::from_der(c.data.as_slice())?, + CertificateEncoding::Pem => reqwest::Certificate::from_pem(c.data.as_slice())?, + }; + client_builder = client_builder.add_root_certificate(cert); + } + + if let Some(timeout) = config.read_timeout { + client_builder = client_builder.timeout(timeout); + } + if let Some(timeout) = config.connect_timeout { + client_builder = client_builder.connect_timeout(timeout); + } + + client_builder = client_builder.user_agent(config.user_agent); + + if let Some(proxy_addr) = &config.https_proxy { + let no_proxy = config + .no_proxy + .as_ref() + .and_then(|no_proxy| NoProxy::from_string(no_proxy)); + let proxy = Proxy::https(proxy_addr)?.no_proxy(no_proxy); + client_builder = client_builder.proxy(proxy); + } + + let default_token_expiration_secs = config.default_token_expiration_secs; + Ok(Self { + config: config, + tokens: SyncTokenCache::new(default_token_expiration_secs), + client: client_builder.build()?, + push_chunk_size: PUSH_CHUNK_MAX_SIZE, + ..Default::default() + }) + } +} + +impl Client { + /// Create a new client with the supplied config + pub fn new(config: ClientConfig) -> Self { + let default_token_expiration_secs = config.default_token_expiration_secs; + Client::try_from(config).unwrap_or_else(|err| { + warn!("Cannot create OCI client from config: {:?}", err); + warn!("Creating client with default configuration"); + Self { + tokens: SyncTokenCache::new(default_token_expiration_secs), + push_chunk_size: PUSH_CHUNK_MAX_SIZE, + ..Default::default() + } + }) + } + + /// Create a new client with the supplied config + pub fn from_source(config_source: &impl ClientConfigSource) -> Self { + Self::new(config_source.client_config()) + } + + fn store_auth(&mut self, registry: &str, auth: RegistryAuth) { + self.auth_store.insert(registry.to_string(), auth); + } + + fn is_stored_auth(&self, registry: &str) -> bool { + self.auth_store.contains_key(registry) + } + + /// Store the authentication information for this registry if it's not already stored in the client. + /// + /// Most of the time, you don't need to call this method directly. It's called by other + /// methods (where you have to provide the authentication information as parameter). + /// + /// But if you want to pull/push a blob without calling any of the other methods first, which would + /// store the authentication information, you can call this method to store the authentication + /// information manually. + pub fn store_auth_if_needed(&mut self, registry: &str, auth: &RegistryAuth) { + if !self.is_stored_auth(registry) { + self.store_auth(registry, auth.clone()); + } + } + + /// Checks if we got a token, if we don't - create it and store it in cache. + fn get_auth_token( + &mut self, + reference: &Reference, + op: RegistryOperation, + ) -> Option { + let registry = reference.resolve_registry(); + let auth = self.auth_store.get(registry)?.clone(); + match self.tokens.get(reference, op) { + Some(token) => Some(token), + None => { + let token = self._auth(reference, &auth, op).ok()??; + self.tokens.insert(reference, op, token.clone()); + Some(token) + } + } + } + + /// Fetches the available Tags for the given Reference + /// + /// The client will check if it's already been authenticated and if + /// not will attempt to do. + pub fn list_tags( + &mut self, + image: &Reference, + auth: &RegistryAuth, + n: Option, + last: Option<&str>, + ) -> Result { + let op = RegistryOperation::Pull; + let url = self.to_list_tags_url(image); + + self.store_auth_if_needed(image.resolve_registry(), auth); + + let request = self.client.get(&url); + let request = if let Some(num) = n { + request.query(&[("n", num)]) + } else { + request + }; + let request = if let Some(l) = last { + request.query(&[("last", l)]) + } else { + request + }; + let mut request = RequestBuilderWrapper { + client: self, + request_builder: request, + }; + let res = request + .apply_auth(image, op)? + .into_request_builder() + .send()?; + let status = res.status(); + let body = res.bytes()?; + + validate_registry_response(status, &body, &url)?; + + Ok(serde_json::from_str(std::str::from_utf8(&body)?)?) + } + + /// Pull an image and return the bytes + /// + /// The client will check if it's already been authenticated and if + /// not will attempt to do. + pub fn pull( + &mut self, + image: &Reference, + auth: &RegistryAuth, + accepted_media_types: Vec<&str>, + ) -> Result { + debug!("Pulling image: {:?}", image); + self.store_auth_if_needed(image.resolve_registry(), auth); + + let (manifest, digest, config) = self._pull_manifest_and_config(image)?; + + self.validate_layers(&manifest, accepted_media_types)?; + + let layers = manifest.layers.iter().try_fold(vec![], |mut acc, layer| { + let mut out: Vec = Vec::new(); + debug!("Pulling image layer"); + match self.pull_blob(image, layer, &mut out) { + Ok(_) => { + acc.push(ImageLayer::new( + out, + layer.media_type.clone(), + layer.annotations.clone(), + )); + Ok(acc) + } + Err(e) => { + warn!(error = ?e, "Failed to pull image layer"); + return Err(e); + } + } + })?; + + Ok(ImageData { + layers, + manifest: Some(manifest), + config, + digest: Some(digest), + }) + } + + /// Push an image and return the uploaded URL of the image + /// + /// The client will check if it's already been authenticated and if + /// not will attempt to do. + /// + /// If a manifest is not provided, the client will attempt to generate + /// it from the provided image and config data. + /// + /// Returns pullable URL for the image + pub fn push( + &mut self, + image_ref: &Reference, + layers: &[ImageLayer], + config: Config, + auth: &RegistryAuth, + manifest: Option, + ) -> Result { + debug!("Pushing image: {:?}", image_ref); + self.store_auth_if_needed(image_ref.resolve_registry(), auth); + + let manifest: OciImageManifest = match manifest { + Some(m) => m, + None => OciImageManifest::build(layers, &config, None), + }; + + // Upload layers + layers.iter().try_for_each(|layer| { + let digest = layer.sha256_digest(); + self.push_blob(image_ref, &layer.data, &digest)?; + Result::Ok(()) + })?; + + let config_url = self.push_blob(image_ref, &config.data, &manifest.config.digest)?; + let manifest_url = self.push_manifest(image_ref, &manifest.into())?; + + Ok(PushResponse { + config_url, + manifest_url, + }) + } + + /// Pushes a blob to the registry + pub fn push_blob( + &mut self, + image_ref: &Reference, + data: &[u8], + digest: &str, + ) -> Result { + if self.config.use_monolithic_push { + return self.push_blob_monolithically(image_ref, data, digest); + } + + match self.push_blob_chunked(image_ref, data, digest) { + Ok(url) => Ok(url), + Err(OciDistributionError::SpecViolationError(violation)) => { + warn!(?violation, "Registry is not respecting the OCI Distribution Specification when doing chunked push operations"); + warn!("Attempting monolithic push"); + self.push_blob_monolithically(image_ref, data, digest) + } + Err(e) => Err(e), + } + } + + /// Pushes a blob to the registry as a monolith + /// + /// Returns the pullable location of the blob + fn push_blob_monolithically( + &mut self, + image: &Reference, + blob_data: &[u8], + blob_digest: &str, + ) -> Result { + let location = self.begin_push_monolithical_session(image)?; + self.push_monolithically(&location, image, blob_data, blob_digest) + } + + /// Pushes a blob to the registry as a series of chunks + /// + /// Returns the pullable location of the blob + fn push_blob_chunked( + &mut self, + image: &Reference, + blob_data: &[u8], + blob_digest: &str, + ) -> Result { + let mut location = self.begin_push_chunked_session(image)?; + let mut start: usize = 0; + loop { + (location, start) = self.push_chunk(&location, image, blob_data, start)?; + if start >= blob_data.len() { + break; + } + } + self.end_push_chunked_session(&location, image, blob_digest) + } + + /// Perform an OAuth v2 auth request if necessary. + /// + /// This performs authorization and then stores the token internally to be used + /// on other requests. + pub fn auth( + &mut self, + image: &Reference, + authentication: &RegistryAuth, + operation: RegistryOperation, + ) -> Result> { + self.store_auth_if_needed(image.resolve_registry(), authentication); + // preserve old caching behavior + match self._auth(image, authentication, operation) { + Ok(Some(RegistryTokenType::Bearer(token))) => { + self.tokens + .insert(image, operation, RegistryTokenType::Bearer(token.clone())); + Ok(Some(token.token().to_string())) + } + Ok(Some(RegistryTokenType::Basic(username, password))) => { + self.tokens.insert( + image, + operation, + RegistryTokenType::Basic(username, password), + ); + Ok(None) + } + Ok(None) => Ok(None), + Err(e) => Err(e), + } + } + + /// Internal auth that retrieves token. + fn _auth( + &self, + image: &Reference, + authentication: &RegistryAuth, + operation: RegistryOperation, + ) -> Result> { + debug!("Authorizing for image: {:?}", image); + // The version request will tell us where to go. + let url = format!( + "{}://{}/v2/", + self.config.protocol.scheme_for(image.resolve_registry()), + image.resolve_registry() + ); + debug!(?url); + let res = self.client.get(&url).send()?; + let dist_hdr = match res.headers().get(reqwest::header::WWW_AUTHENTICATE) { + Some(h) => h, + None => return Ok(None), + }; + + let challenge = match BearerChallenge::try_from(dist_hdr) { + Ok(c) => c, + Err(e) => { + debug!(error = ?e, "Falling back to HTTP Basic Auth"); + if let RegistryAuth::Basic(username, password) = authentication { + return Ok(Some(RegistryTokenType::Basic( + username.to_string(), + password.to_string(), + ))); + } + return Ok(None); + } + }; + + // Allow for either push or pull authentication + let scope = match operation { + RegistryOperation::Pull => format!("repository:{}:pull", image.repository()), + RegistryOperation::Push => format!("repository:{}:pull,push", image.repository()), + }; + + let realm = challenge.realm.as_ref(); + let service = challenge.service.as_ref(); + let mut query = vec![("scope", &scope)]; + + if let Some(s) = service { + query.push(("service", s)) + } + + // TODO: At some point in the future, we should support sending a secret to the + // server for auth. This particular workflow is for read-only public auth. + debug!(?realm, ?service, ?scope, "Making authentication call"); + + let auth_res = self + .client + .get(realm) + .query(&query) + .apply_authentication(authentication) + .send()?; + + match auth_res.status() { + reqwest::StatusCode::OK => { + let text = auth_res.text()?; + debug!("Received response from auth request: {}", text); + let token: RegistryToken = serde_json::from_str(&text) + .map_err(|e| OciDistributionError::RegistryTokenDecodeError(e.to_string()))?; + debug!("Successfully authorized for image '{:?}'", image); + Ok(Some(RegistryTokenType::Bearer(token))) + } + _ => { + let reason = auth_res.text()?; + debug!("Failed to authenticate for image '{:?}': {}", image, reason); + Err(OciDistributionError::AuthenticationFailure(reason)) + } + } + } + + /// Fetch a manifest's digest from the remote OCI Distribution service. + /// + /// If the connection has already gone through authentication, this will + /// use the bearer token. Otherwise, this will attempt an anonymous pull. + /// + /// Will first attempt to read the `Docker-Content-Digest` header using a + /// HEAD request. If this header is not present, will make a second GET + /// request and return the SHA256 of the response body. + pub fn fetch_manifest_digest( + &mut self, + image: &Reference, + auth: &RegistryAuth, + ) -> Result { + self.store_auth_if_needed(image.resolve_registry(), auth); + + let url = self.to_v2_manifest_url(image); + debug!("HEAD image manifest from {}", url); + let res = RequestBuilderWrapper::from_client(self, |client| client.head(&url)) + .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)? + .apply_auth(image, RegistryOperation::Pull)? + .into_request_builder() + .send()?; + + if let Some(digest) = digest_header_value(res.headers().clone())? { + let status = res.status(); + let body = res.bytes()?; + validate_registry_response(status, &body, &url)?; + + // If the reference has a digest and the digest header has a matching algorithm, compare + // them and return an error if they don't match. + if let Some(img_digest) = image.digest() { + let header_digest = Digest::new(&digest)?; + let image_digest = Digest::new(img_digest)?; + if header_digest.algorithm == image_digest.algorithm + && header_digest != image_digest + { + return Err(DigestError::VerificationError { + expected: img_digest.to_string(), + actual: digest, + } + .into()); + } + } + + Ok(digest) + } else { + debug!("GET image manifest from {}", url); + let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url)) + .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)? + .apply_auth(image, RegistryOperation::Pull)? + .into_request_builder() + .send()?; + let status = res.status(); + trace!(headers = ?res.headers(), "Got Headers"); + let headers = res.headers().clone(); + let body = res.bytes()?; + validate_registry_response(status, &body, &url)?; + + validate_digest(&body, digest_header_value(headers)?, image.digest()) + .map_err(OciDistributionError::from) + } + } + + fn validate_layers( + &self, + manifest: &OciImageManifest, + accepted_media_types: Vec<&str>, + ) -> Result<()> { + if manifest.layers.is_empty() { + return Err(OciDistributionError::PullNoLayersError); + } + + for layer in &manifest.layers { + if !accepted_media_types.iter().any(|i| i.eq(&layer.media_type)) { + return Err(OciDistributionError::IncompatibleLayerMediaTypeError( + layer.media_type.clone(), + )); + } + } + + Ok(()) + } + + /// Pull a manifest from the remote OCI Distribution service. + /// + /// The client will check if it's already been authenticated and if + /// not will attempt to do. + /// + /// A Tuple is returned containing the [OciImageManifest](crate::manifest::OciImageManifest) + /// and the manifest content digest hash. + /// + /// If a multi-platform Image Index manifest is encountered, a platform-specific + /// Image manifest will be selected using the client's default platform resolution. + pub fn pull_image_manifest( + &mut self, + image: &Reference, + auth: &RegistryAuth, + ) -> Result<(OciImageManifest, String)> { + self.store_auth_if_needed(image.resolve_registry(), auth); + + self._pull_image_manifest(image) + } + + /// Pull a manifest from the remote OCI Distribution service without parsing it. + /// + /// The client will check if it's already been authenticated and if + /// not will attempt to do. + /// + /// A Tuple is returned containing raw byte representation of the manifest + /// and the manifest content digest. + pub fn pull_manifest_raw( + &mut self, + image: &Reference, + auth: &RegistryAuth, + accepted_media_types: &[&str], + ) -> Result<(Vec, String)> { + self.store_auth_if_needed(image.resolve_registry(), auth); + + self._pull_manifest_raw(image, accepted_media_types) + } + + /// Pull a manifest from the remote OCI Distribution service. + /// + /// The client will check if it's already been authenticated and if + /// not will attempt to do. + /// + /// A Tuple is returned containing the [Manifest](crate::manifest::OciImageManifest) + /// and the manifest content digest hash. + pub fn pull_manifest( + &mut self, + image: &Reference, + auth: &RegistryAuth, + ) -> Result<(OciManifest, String)> { + self.store_auth_if_needed(image.resolve_registry(), auth); + + self._pull_manifest(image) + } + + /// Pull an image manifest from the remote OCI Distribution service. + /// + /// If the connection has already gone through authentication, this will + /// use the bearer token. Otherwise, this will attempt an anonymous pull. + /// + /// If a multi-platform Image Index manifest is encountered, a platform-specific + /// Image manifest will be selected using the client's default platform resolution. + fn _pull_image_manifest(&mut self, image: &Reference) -> Result<(OciImageManifest, String)> { + let (manifest, digest) = self._pull_manifest(image)?; + match manifest { + OciManifest::Image(image_manifest) => Ok((image_manifest, digest)), + OciManifest::ImageIndex(image_index_manifest) => { + debug!("Inspecting Image Index Manifest"); + let digest = if let Some(resolver) = &self.config.platform_resolver { + resolver(&image_index_manifest.manifests) + } else { + return Err(OciDistributionError::ImageIndexParsingNoPlatformResolverError); + }; + + match digest { + Some(digest) => { + debug!("Selected manifest entry with digest: {}", digest); + let manifest_entry_reference = image.clone_with_digest(digest.clone()); + self._pull_manifest(&manifest_entry_reference).and_then( + |(manifest, _digest)| match manifest { + OciManifest::Image(manifest) => Ok((manifest, digest)), + OciManifest::ImageIndex(_) => { + Err(OciDistributionError::ImageManifestNotFoundError( + "received Image Index manifest instead".to_string(), + )) + } + }, + ) + } + None => Err(OciDistributionError::ImageManifestNotFoundError( + "no entry found in image index manifest matching client's default platform" + .to_string(), + )), + } + } + } + } + + /// Pull a manifest from the remote OCI Distribution service without parsing it. + /// + /// If the connection has already gone through authentication, this will + /// use the bearer token. Otherwise, this will attempt an anonymous pull. + fn _pull_manifest_raw( + &mut self, + image: &Reference, + accepted_media_types: &[&str], + ) -> Result<(Vec, String)> { + let url = self.to_v2_manifest_url(image); + debug!("Pulling image manifest from {}", url); + + let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url)) + .apply_accept(accepted_media_types)? + .apply_auth(image, RegistryOperation::Pull)? + .into_request_builder() + .send()?; + let status = res.status(); + let headers = res.headers().clone(); + let body = res.bytes()?; + + validate_registry_response(status, &body, &url)?; + + let digest_header = digest_header_value(headers)?; + let digest = validate_digest(&body, digest_header, image.digest())?; + + Ok((body.to_vec(), digest)) + } + + /// Pull a manifest from the remote OCI Distribution service. + /// + /// If the connection has already gone through authentication, this will + /// use the bearer token. Otherwise, this will attempt an anonymous pull. + fn _pull_manifest(&mut self, image: &Reference) -> Result<(OciManifest, String)> { + let (body, digest) = self._pull_manifest_raw(image, MIME_TYPES_DISTRIBUTION_MANIFEST)?; + + self.validate_image_manifest(&body)?; + + debug!("Parsing response as Manifest"); + let manifest = serde_json::from_slice(&body) + .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?; + Ok((manifest, digest)) + } + + fn validate_image_manifest(&self, body: &[u8]) -> Result<()> { + let versioned: Versioned = serde_json::from_slice(body) + .map_err(|e| OciDistributionError::VersionedParsingError(e.to_string()))?; + debug!(?versioned, "validating manifest"); + if versioned.schema_version != 2 { + return Err(OciDistributionError::UnsupportedSchemaVersionError( + versioned.schema_version, + )); + } + if let Some(media_type) = versioned.media_type { + if media_type != IMAGE_MANIFEST_MEDIA_TYPE + && media_type != OCI_IMAGE_MEDIA_TYPE + && media_type != IMAGE_MANIFEST_LIST_MEDIA_TYPE + && media_type != OCI_IMAGE_INDEX_MEDIA_TYPE + { + return Err(OciDistributionError::UnsupportedMediaTypeError(media_type)); + } + } + + Ok(()) + } + + /// Pull a manifest and its config from the remote OCI Distribution service. + /// + /// The client will check if it's already been authenticated and if + /// not will attempt to do. + /// + /// A Tuple is returned containing the [OciImageManifest](crate::manifest::OciImageManifest), + /// the manifest content digest hash and the contents of the manifests config layer + /// as a String. + pub fn pull_manifest_and_config( + &mut self, + image: &Reference, + auth: &RegistryAuth, + ) -> Result<(OciImageManifest, String, String)> { + self.store_auth_if_needed(image.resolve_registry(), auth); + + self._pull_manifest_and_config(image) + .and_then(|(manifest, digest, config)| { + Ok(( + manifest, + digest, + String::from_utf8(config.data).map_err(|e| { + OciDistributionError::GenericError(Some(format!( + "Cannot not UTF8 compliant: {}", + e + ))) + })?, + )) + }) + } + + fn _pull_manifest_and_config( + &mut self, + image: &Reference, + ) -> Result<(OciImageManifest, String, Config)> { + let (manifest, digest) = self._pull_image_manifest(image)?; + + let mut out: Vec = Vec::new(); + debug!("Pulling config layer"); + self.pull_blob(image, &manifest.config, &mut out)?; + let media_type = manifest.config.media_type.clone(); + let annotations = manifest.annotations.clone(); + Ok((manifest, digest, Config::new(out, media_type, annotations))) + } + + /// Push a manifest list to an OCI registry. + /// + /// This pushes a manifest list to an OCI registry. + pub fn push_manifest_list( + &mut self, + reference: &Reference, + auth: &RegistryAuth, + manifest: OciImageIndex, + ) -> Result { + self.store_auth_if_needed(reference.resolve_registry(), auth); + self.push_manifest(reference, &OciManifest::ImageIndex(manifest)) + } + + /// Pull a single layer from an OCI registry. + /// + /// This pulls the layer for a particular image that is identified by the given layer + /// descriptor. The layer descriptor can be anything that can be referenced as a layer + /// descriptor. The image reference is used to find the repository and the registry, but it is + /// not used to verify that the digest is a layer inside of the image. (The manifest is used for + /// that.) + pub fn pull_blob( + &mut self, + image: &Reference, + layer: impl AsLayerDescriptor, + mut out: T, + ) -> Result<()> { + let response = self.pull_blob_response(image, &layer, None, None)?; + + let mut maybe_header_digester = digest_header_value(response.headers().clone())? + .map(|digest| Digester::new(&digest).map(|d| (d, digest))) + .transpose()?; + + // With a blob pull, we need to use the digest from the layer and not the image + let layer_digest = layer.as_layer_descriptor().digest.to_string(); + let mut layer_digester = Digester::new(&layer_digest)?; + + let bytes = response.error_for_status()?.bytes()?; + if let Some((ref mut digester, _)) = maybe_header_digester.as_mut() { + digester.update(&bytes); + } + layer_digester.update(&bytes); + out.write_all(&bytes)?; + + if let Some((mut digester, expected)) = maybe_header_digester.take() { + let digest = digester.finalize(); + + if digest != expected { + return Err(DigestError::VerificationError { + expected, + actual: digest, + } + .into()); + } + } + + let digest = layer_digester.finalize(); + if digest != layer_digest { + return Err(DigestError::VerificationError { + expected: layer_digest, + actual: digest, + } + .into()); + } + + Ok(()) + } + + /// Pull a single layer from an OCI registry. + fn pull_blob_response( + &mut self, + image: &Reference, + layer: impl AsLayerDescriptor, + offset: Option, + length: Option, + ) -> Result { + let layer = layer.as_layer_descriptor(); + let url = self.to_v2_blob_url(image, layer.digest); + + let mut request = RequestBuilderWrapper::from_client(self, |client| client.get(&url)) + .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)? + .apply_auth(image, RegistryOperation::Pull)? + .into_request_builder(); + if let (Some(off), Some(len)) = (offset, length) { + let end = (off + len).saturating_sub(1); + request = request.header( + RANGE, + HeaderValue::from_str(&format!("bytes={off}-{end}")).unwrap(), + ); + } else if let Some(offset) = offset { + request = request.header( + RANGE, + HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(), + ); + } + let mut response = request.send()?; + + if let Some(urls) = &layer.urls { + for url in urls { + if response.error_for_status_ref().is_ok() { + break; + } + + let url = Url::parse(url) + .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?; + + if url.scheme() == "http" || url.scheme() == "https" { + // NOTE: we must not authenticate on additional URLs as those + // can be abused to leak credentials or tokens. Please + // refer to CVE-2020-15157 for more information. + request = + RequestBuilderWrapper::from_client(self, |client| client.get(url.clone())) + .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)? + .into_request_builder(); + if let Some(offset) = offset { + request = request.header( + RANGE, + HeaderValue::from_str(&format!("bytes={offset}-")).unwrap(), + ); + } + response = request.send()? + } + } + } + + Ok(response) + } + + /// Begins a session to push an image to registry in a monolithical way + /// + /// Returns URL with session UUID + fn begin_push_monolithical_session(&mut self, image: &Reference) -> Result { + let url = &self.to_v2_blob_upload_url(image); + debug!(?url, "begin_push_monolithical_session"); + let res = RequestBuilderWrapper::from_client(self, |client| client.post(url)) + .apply_auth(image, RegistryOperation::Push)? + .into_request_builder() + // We set "Content-Length" to 0 here even though the OCI Distribution + // spec does not strictly require that. In practice we have seen that + // certain registries require "Content-Length" to be present for all + // types of push sessions. + .header("Content-Length", 0) + .send()?; + + // OCI spec requires the status code be 202 Accepted to successfully begin the push process + self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED) + } + + /// Begins a session to push an image to registry as a series of chunks + /// + /// Returns URL with session UUID + fn begin_push_chunked_session(&mut self, image: &Reference) -> Result { + let url = &self.to_v2_blob_upload_url(image); + debug!(?url, "begin_push_session"); + let res = RequestBuilderWrapper::from_client(self, |client| client.post(url)) + .apply_auth(image, RegistryOperation::Push)? + .into_request_builder() + .header("Content-Length", 0) + .send()?; + + // OCI spec requires the status code be 202 Accepted to successfully begin the push process + self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED) + } + + /// Closes the chunked push session + /// + /// Returns the pullable URL for the image + fn end_push_chunked_session( + &mut self, + location: &str, + image: &Reference, + digest: &str, + ) -> Result { + let url = Url::parse_with_params(location, &[("digest", digest)]) + .map_err(|e| OciDistributionError::GenericError(Some(e.to_string())))?; + let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone())) + .apply_auth(image, RegistryOperation::Push)? + .into_request_builder() + .header("Content-Length", 0) + .send()?; + self.extract_location_header(image, res, &reqwest::StatusCode::CREATED) + } + + /// Pushes a layer to a registry as a monolithical blob. + /// + /// Returns the URL location for the next layer + fn push_monolithically( + &mut self, + location: &str, + image: &Reference, + layer: &[u8], + blob_digest: &str, + ) -> Result { + let mut url = Url::parse(location).unwrap(); + url.query_pairs_mut().append_pair("digest", blob_digest); + let url = url.to_string(); + + debug!(size = layer.len(), location = ?url, "Pushing monolithically"); + if layer.is_empty() { + return Err(OciDistributionError::PushNoDataError); + }; + let mut headers = HeaderMap::new(); + headers.insert( + "Content-Length", + format!("{}", layer.len()).parse().unwrap(), + ); + headers.insert("Content-Type", "application/octet-stream".parse().unwrap()); + + let res = RequestBuilderWrapper::from_client(self, |client| client.put(&url)) + .apply_auth(image, RegistryOperation::Push)? + .into_request_builder() + .headers(headers) + .body(layer.to_vec()) + .send()?; + + // Returns location + self.extract_location_header(image, res, &reqwest::StatusCode::CREATED) + } + + /// Pushes a single chunk of a blob to a registry, + /// as part of a chunked blob upload. + /// + /// Returns the URL location for the next chunk + fn push_chunk( + &mut self, + location: &str, + image: &Reference, + blob_data: &[u8], + start_byte: usize, + ) -> Result<(String, usize)> { + if blob_data.is_empty() { + return Err(OciDistributionError::PushNoDataError); + }; + let end_byte = if (start_byte + self.push_chunk_size) < blob_data.len() { + start_byte + self.push_chunk_size - 1 + } else { + blob_data.len() - 1 + }; + let body = blob_data[start_byte..end_byte + 1].to_vec(); + let mut headers = HeaderMap::new(); + headers.insert( + "Content-Range", + format!("{}-{}", start_byte, end_byte).parse().unwrap(), + ); + headers.insert("Content-Length", format!("{}", body.len()).parse().unwrap()); + headers.insert("Content-Type", "application/octet-stream".parse().unwrap()); + + debug!( + ?start_byte, + ?end_byte, + blob_data_len = blob_data.len(), + body_len = body.len(), + ?location, + ?headers, + "Pushing chunk" + ); + + let res = RequestBuilderWrapper::from_client(self, |client| client.patch(location)) + .apply_auth(image, RegistryOperation::Push)? + .into_request_builder() + .headers(headers) + .body(body) + .send()?; + + // Returns location for next chunk and the start byte for the next range + Ok(( + self.extract_location_header(image, res, &reqwest::StatusCode::ACCEPTED)?, + end_byte + 1, + )) + } + + /// Mounts a blob to the provided reference, from the given source + pub fn mount_blob( + &mut self, + image: &Reference, + source: &Reference, + digest: &str, + ) -> Result<()> { + let base_url = self.to_v2_blob_upload_url(image); + let url = Url::parse_with_params( + &base_url, + &[("mount", digest), ("from", source.repository())], + ) + .map_err(|e| OciDistributionError::UrlParseError(e.to_string()))?; + + let res = RequestBuilderWrapper::from_client(self, |client| client.post(url.clone())) + .apply_auth(image, RegistryOperation::Push)? + .into_request_builder() + .send()?; + + self.extract_location_header(image, res, &reqwest::StatusCode::CREATED)?; + + Ok(()) + } + + /// Pushes the manifest for a specified image + /// + /// Returns pullable manifest URL + pub fn push_manifest(&mut self, image: &Reference, manifest: &OciManifest) -> Result { + let mut headers = HeaderMap::new(); + let content_type = manifest.content_type(); + headers.insert("Content-Type", content_type.parse().unwrap()); + + // Serialize the manifest with a canonical json formatter, as described at + // https://github.com/opencontainers/image-spec/blob/main/considerations.md#json + let mut body = Vec::new(); + let mut ser = serde_json::Serializer::with_formatter(&mut body, CanonicalFormatter::new()); + manifest.serialize(&mut ser).unwrap(); + + self.push_manifest_raw(image, body, manifest.content_type().parse().unwrap()) + } + + /// Pushes the manifest, provided as raw bytes, for a specified image + /// + /// Returns pullable manifest url + pub fn push_manifest_raw( + &mut self, + image: &Reference, + body: Vec, + content_type: HeaderValue, + ) -> Result { + let url = self.to_v2_manifest_url(image); + debug!(?url, ?content_type, "push manifest"); + + let mut headers = HeaderMap::new(); + headers.insert("Content-Type", content_type); + + // Calculate the digest of the manifest, this is useful + // if the remote registry is violating the OCI Distribution Specification. + // See below for more details. + let manifest_hash = sha256_digest(&body); + + let res = RequestBuilderWrapper::from_client(self, |client| client.put(url.clone())) + .apply_auth(image, RegistryOperation::Push)? + .into_request_builder() + .headers(headers) + .body(body) + .send()?; + + let ret = self.extract_location_header(image, res, &reqwest::StatusCode::CREATED); + + if matches!(ret, Err(OciDistributionError::RegistryNoLocationError)) { + // The registry is violating the OCI Distribution Spec, BUT the OCI + // image/artifact has been uploaded successfully. + // The `Location` header contains the sha256 digest of the manifest, + // we can reuse the value we calculated before. + // The workaround is there because repositories such as + // AWS ECR are violating this aspect of the spec. This at least let the + // oci-distribution users interact with these registries. + warn!("Registry is not respecting the OCI Distribution Specification: it didn't return the Location of the uploaded Manifest inside of the response headers. Working around this issue..."); + + let url_base = url + .strip_suffix(image.tag().unwrap_or("latest")) + .expect("The manifest URL always ends with the image tag suffix"); + let url_by_digest = format!("{}{}", url_base, manifest_hash); + + return Ok(url_by_digest); + } + + ret + } + + /// Pulls the referrers for the given image filtering by the optionally provided artifact type. + pub fn pull_referrers( + &mut self, + image: &Reference, + artifact_type: Option<&str>, + ) -> Result { + let url = self.to_v2_referrers_url(image, artifact_type)?; + debug!("Pulling referrers from {}", url); + + let res = RequestBuilderWrapper::from_client(self, |client| client.get(&url)) + .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)? + .apply_auth(image, RegistryOperation::Pull)? + .into_request_builder() + .send()?; + let status = res.status(); + let body = res.bytes()?; + + validate_registry_response(status, &body, &url)?; + let manifest = serde_json::from_slice(&body) + .map_err(|e| OciDistributionError::ManifestParsingError(e.to_string()))?; + + Ok(manifest) + } + + fn extract_location_header( + &self, + image: &Reference, + res: reqwest::blocking::Response, + expected_status: &reqwest::StatusCode, + ) -> Result { + debug!(expected_status_code=?expected_status.as_u16(), + status_code=?res.status().as_u16(), + "extract location header"); + if res.status().eq(expected_status) { + let location_header = res.headers().get("Location"); + debug!(location=?location_header, "Location header"); + match location_header { + None => Err(OciDistributionError::RegistryNoLocationError), + Some(lh) => self.location_header_to_url(image, lh), + } + } else if res.status().is_success() && expected_status.is_success() { + Err(OciDistributionError::SpecViolationError(format!( + "Expected HTTP Status {}, got {} instead", + expected_status, + res.status(), + ))) + } else { + let url = res.url().to_string(); + let code = res.status().as_u16(); + let message = res.text()?; + Err(OciDistributionError::ServerError { url, code, message }) + } + } + + /// Helper function to convert location header to URL + /// + /// Location may be absolute (containing the protocol and/or hostname), or relative (containing just the URL path) + /// Returns a properly formatted absolute URL + fn location_header_to_url( + &self, + image: &Reference, + location_header: &reqwest::header::HeaderValue, + ) -> Result { + let lh = location_header.to_str()?; + if lh.starts_with("/") { + let registry = image.resolve_registry(); + Ok(format!( + "{scheme}://{registry}{lh}", + scheme = self.config.protocol.scheme_for(registry) + )) + } else { + Ok(lh.to_string()) + } + } + + /// Convert a Reference to a v2 manifest URL. + fn to_v2_manifest_url(&self, reference: &Reference) -> String { + let registry = reference.resolve_registry(); + format!( + "{scheme}://{registry}/v2/{repository}/manifests/{reference}{ns}", + scheme = self.config.protocol.scheme_for(registry), + repository = reference.repository(), + reference = if let Some(digest) = reference.digest() { + digest + } else { + reference.tag().unwrap_or("latest") + }, + ns = reference + .namespace() + .map(|ns| format!("?ns={ns}")) + .unwrap_or_default(), + ) + } + + /// Convert a Reference to a v2 blob (layer) URL. + fn to_v2_blob_url(&self, reference: &Reference, digest: &str) -> String { + let registry = reference.resolve_registry(); + format!( + "{scheme}://{registry}/v2/{repository}/blobs/{digest}{ns}", + scheme = self.config.protocol.scheme_for(registry), + repository = reference.repository(), + ns = reference + .namespace() + .map(|ns| format!("?ns={ns}")) + .unwrap_or_default(), + ) + } + + /// Convert a Reference to a v2 blob upload URL. + fn to_v2_blob_upload_url(&self, reference: &Reference) -> String { + self.to_v2_blob_url(reference, "uploads/") + } + + fn to_list_tags_url(&self, reference: &Reference) -> String { + let registry = reference.resolve_registry(); + format!( + "{scheme}://{registry}/v2/{repository}/tags/list{ns}", + scheme = self.config.protocol.scheme_for(registry), + repository = reference.repository(), + ns = reference + .namespace() + .map(|ns| format!("?ns={ns}")) + .unwrap_or_default(), + ) + } + + /// Convert a Reference to a v2 manifest URL. + fn to_v2_referrers_url( + &self, + reference: &Reference, + artifact_type: Option<&str>, + ) -> Result { + let registry = reference.resolve_registry(); + Ok(format!( + "{scheme}://{registry}/v2/{repository}/referrers/{reference}{at}", + scheme = self.config.protocol.scheme_for(registry), + repository = reference.repository(), + reference = if let Some(digest) = reference.digest() { + digest + } else { + return Err(OciDistributionError::GenericError(Some( + "Getting referrers for a tag is not supported".into(), + ))); + }, + at = artifact_type + .map(|at| format!("?artifactType={at}")) + .unwrap_or_default(), + )) + } +} + +/// The OCI spec technically does not allow any codes but 200, 500, 401, and 404. +/// Obviously, HTTP servers are going to send other codes. This tries to catch the +/// obvious ones (200, 4XX, 5XX). Anything else is just treated as an error. +fn validate_registry_response(status: reqwest::StatusCode, body: &[u8], url: &str) -> Result<()> { + match status { + reqwest::StatusCode::OK => Ok(()), + reqwest::StatusCode::UNAUTHORIZED => Err(OciDistributionError::UnauthorizedError { + url: url.to_string(), + }), + s if s.is_success() => Err(OciDistributionError::SpecViolationError(format!( + "Expected HTTP Status {}, got {} instead", + reqwest::StatusCode::OK, + status, + ))), + s if s.is_client_error() => { + let text = std::str::from_utf8(body)?; + // According to the OCI spec, we should see an error in the message body. + let envelope = serde_json::from_str::(text)?; + Err(OciDistributionError::RegistryError { + envelope, + url: url.to_string(), + }) + } + s => { + let text = std::str::from_utf8(body)?; + + Err(OciDistributionError::ServerError { + code: s.as_u16(), + url: url.to_string(), + message: text.to_string(), + }) + } + } +} + +/// The request builder wrapper allows to be instantiated from a +/// `Client` and allows composable operations on the request builder, +/// to produce a `RequestBuilder` object that can be executed. +struct RequestBuilderWrapper<'a> { + client: &'a mut Client, + request_builder: RequestBuilder, +} + +// RequestBuilderWrapper type management +impl<'a> RequestBuilderWrapper<'a> { + /// Create a `RequestBuilderWrapper` from a `Client` instance, by + /// instantiating the internal `RequestBuilder` with the provided + /// function `f`. + fn from_client( + client: &'a mut Client, + f: impl Fn(&reqwest::blocking::Client) -> RequestBuilder, + ) -> RequestBuilderWrapper<'a> { + let request_builder = f(&client.client); + RequestBuilderWrapper { + client, + request_builder, + } + } + + // Produces a final `RequestBuilder` out of this `RequestBuilderWrapper` + fn into_request_builder(self) -> RequestBuilder { + self.request_builder + } +} + +// Composable functions applicable to a `RequestBuilderWrapper` +impl<'a> RequestBuilderWrapper<'a> { + fn apply_accept(&mut self, accept: &[&str]) -> Result { + let request_builder = self + .request_builder + .try_clone() + .ok_or_else(|| { + OciDistributionError::GenericError(Some( + "could not clone request builder".to_string(), + )) + })? + .header("Accept", Vec::from(accept).join(", ")); + + Ok(RequestBuilderWrapper { + client: self.client, + request_builder, + }) + } + + /// Updates request as necessary for authentication. + /// + /// If the struct has Some(bearer), this will insert the bearer token in an + /// Authorization header. It will also set the Accept header, which must + /// be set on all OCI Registry requests. If the struct has HTTP Basic Auth + /// credentials, these will be configured. + fn apply_auth( + &mut self, + image: &Reference, + op: RegistryOperation, + ) -> Result { + let mut headers = HeaderMap::new(); + + if let Some(token) = self.client.get_auth_token(image, op) { + match token { + RegistryTokenType::Bearer(token) => { + debug!("Using bearer token authentication."); + headers.insert("Authorization", token.bearer_token().parse().unwrap()); + } + RegistryTokenType::Basic(username, password) => { + debug!("Using HTTP basic authentication."); + return Ok(RequestBuilderWrapper { + client: self.client, + request_builder: self + .request_builder + .try_clone() + .ok_or_else(|| { + OciDistributionError::GenericError(Some( + "could not clone request builder".to_string(), + )) + })? + .headers(headers) + .basic_auth(username.to_string(), Some(password.to_string())), + }); + } + } + } + Ok(RequestBuilderWrapper { + client: self.client, + request_builder: self + .request_builder + .try_clone() + .ok_or_else(|| { + OciDistributionError::GenericError(Some( + "could not clone request builder".to_string(), + )) + })? + .headers(headers), + }) + } +} + +/// The encoding of the certificate +#[derive(Debug, Clone)] +pub enum CertificateEncoding { + #[allow(missing_docs)] + Der, + #[allow(missing_docs)] + Pem, +} + +/// A x509 certificate +#[derive(Debug, Clone)] +pub struct Certificate { + /// Which encoding is used by the certificate + pub encoding: CertificateEncoding, + + /// Actual certificate + pub data: Vec, +} + +/// A client configuration +pub struct ClientConfig { + /// Which protocol the client should use + pub protocol: ClientProtocol, + + /// Accept invalid hostname. Defaults to false + #[cfg(feature = "native-tls")] + pub accept_invalid_hostnames: bool, + + /// Accept invalid certificates. Defaults to false + pub accept_invalid_certificates: bool, + + /// Use monolithic push for pushing blobs. Defaults to false + pub use_monolithic_push: bool, + + /// A list of extra root certificate to trust. This can be used to connect + /// to servers using self-signed certificates + pub extra_root_certificates: Vec, + + /// A function that defines the client's behaviour if an Image Index Manifest + /// (i.e Manifest List) is encountered when pulling an image. + /// Defaults to [current_platform_resolver](self::current_platform_resolver), + /// which attempts to choose an image matching the running OS and Arch. + /// + /// If set to None, an error is raised if an Image Index manifest is received + /// during an image pull. + pub platform_resolver: Option>, + + /// Maximum number of concurrent uploads to perform during a `push` + /// operation. + /// + /// This defaults to [`DEFAULT_MAX_CONCURRENT_UPLOAD`]. + pub max_concurrent_upload: usize, + + /// Maximum number of concurrent downloads to perform during a `pull` + /// operation. + /// + /// This defaults to [`DEFAULT_MAX_CONCURRENT_DOWNLOAD`]. + pub max_concurrent_download: usize, + + /// Default token expiration in seconds, to use when the token claim + /// doesn't provide a value. + /// + /// This defaults to [`DEFAULT_TOKEN_EXPIRATION_SECS`]. + pub default_token_expiration_secs: usize, + + /// Enables a read timeout for the client. + /// + /// See [`reqwest::ClientBuilder::read_timeout`] for more information. + pub read_timeout: Option, + + /// Set a timeout for the connect phase for the client. + /// + /// See [`reqwest::ClientBuilder::connect_timeout`] for more information. + pub connect_timeout: Option, + + /// Set the `User-Agent` used by the client. + /// + /// This defaults to [`DEFAULT_USER_AGENT`]. + pub user_agent: &'static str, + + /// Set the `HTTPS PROXY` used by the client. + /// + /// This defaults to `None`. + pub https_proxy: Option, + + /// Set the `NO PROXY` used by the client. + /// + /// This defaults to `None`. + pub no_proxy: Option, +} + +impl Default for ClientConfig { + fn default() -> Self { + Self { + protocol: ClientProtocol::default(), + #[cfg(feature = "native-tls")] + accept_invalid_hostnames: false, + accept_invalid_certificates: false, + use_monolithic_push: false, + extra_root_certificates: Vec::new(), + platform_resolver: Some(Box::new(current_platform_resolver)), + max_concurrent_upload: DEFAULT_MAX_CONCURRENT_UPLOAD, + max_concurrent_download: DEFAULT_MAX_CONCURRENT_DOWNLOAD, + default_token_expiration_secs: DEFAULT_TOKEN_EXPIRATION_SECS, + read_timeout: None, + connect_timeout: None, + user_agent: DEFAULT_USER_AGENT, + https_proxy: None, + no_proxy: None, + } + } +} + +// Be explicit about the traits supported by this type. This is needed to use +// the Client behind a dynamic reference. +// Something similar to what is described here: https://users.rust-lang.org/t/how-to-send-function-closure-to-another-thread/43549 +type PlatformResolverFn = dyn Fn(&[ImageIndexEntry]) -> Option + Send + Sync; + +/// A platform resolver that chooses the first linux/amd64 variant, if present +pub fn linux_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option { + manifests + .iter() + .find(|entry| { + entry + .platform + .as_ref() + .is_some_and(|platform| platform.os == "linux" && platform.architecture == "amd64") + }) + .map(|entry| entry.digest.clone()) +} + +/// A platform resolver that chooses the first windows/amd64 variant, if present +pub fn windows_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option { + manifests + .iter() + .find(|entry| { + entry.platform.as_ref().is_some_and(|platform| { + platform.os == "windows" && platform.architecture == "amd64" + }) + }) + .map(|entry| entry.digest.clone()) +} + +const MACOS: &str = "macos"; +const DARWIN: &str = "darwin"; + +fn go_os() -> &'static str { + // Massage Rust OS var to GO OS: + // - Rust: https://doc.rust-lang.org/std/env/consts/constant.OS.html + // - Go: https://golang.org/doc/install/source#environment + match std::env::consts::OS { + MACOS => DARWIN, + other => other, + } +} + +const X86_64: &str = "x86_64"; +const AMD64: &str = "amd64"; +const X86: &str = "x86"; +const AMD: &str = "amd"; +const ARM64: &str = "arm64"; +const AARCH64: &str = "aarch64"; +const POWERPC64: &str = "powerpc64"; +const PPC64LE: &str = "ppc64le"; + +fn go_arch() -> &'static str { + // Massage Rust Architecture vars to GO equivalent: + // - Rust: https://doc.rust-lang.org/std/env/consts/constant.ARCH.html + // - Go: https://golang.org/doc/install/source#environment + match std::env::consts::ARCH { + X86_64 => AMD64, + X86 => AMD, + AARCH64 => ARM64, + POWERPC64 => PPC64LE, + other => other, + } +} + +/// A platform resolver that chooses the first variant matching the running OS/Arch, if present. +/// Doesn't currently handle platform.variants. +pub fn current_platform_resolver(manifests: &[ImageIndexEntry]) -> Option { + manifests + .iter() + .find(|entry| { + entry.platform.as_ref().map_or(false, |platform| { + platform.os == go_os() && platform.architecture == go_arch() + }) + }) + .map(|entry| entry.digest.clone()) +} + +/// The protocol that the client should use to connect +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub enum ClientProtocol { + #[allow(missing_docs)] + Http, + #[allow(missing_docs)] + #[default] + Https, + #[allow(missing_docs)] + HttpsExcept(Vec), +} + +impl ClientProtocol { + fn scheme_for(&self, registry: &str) -> &str { + match self { + ClientProtocol::Https => "https", + ClientProtocol::Http => "http", + ClientProtocol::HttpsExcept(exceptions) => { + if exceptions.contains(®istry.to_owned()) { + "http" + } else { + "https" + } + } + } + } +} + +#[derive(Clone, Debug)] +struct BearerChallenge { + pub realm: Box, + pub service: Option, +} + +impl TryFrom<&HeaderValue> for BearerChallenge { + type Error = String; + + fn try_from(value: &HeaderValue) -> std::result::Result { + let parser = ChallengeParser::new( + value + .to_str() + .map_err(|e| format!("cannot convert header value to string: {:?}", e))?, + ); + parser + .filter_map(|parser_res| { + if let Ok(chalenge_ref) = parser_res { + let bearer_challenge = BearerChallenge::try_from(&chalenge_ref); + bearer_challenge.ok() + } else { + None + } + }) + .next() + .ok_or_else(|| "Cannot find Bearer challenge".to_string()) + } +} + +impl TryFrom<&ChallengeRef<'_>> for BearerChallenge { + type Error = String; + + fn try_from(value: &ChallengeRef<'_>) -> std::result::Result { + if !value.scheme.eq_ignore_ascii_case("Bearer") { + return Err(format!( + "BearerChallenge doesn't support challenge scheme {:?}", + value.scheme + )); + } + let mut realm = None; + let mut service = None; + for (k, v) in &value.params { + if k.eq_ignore_ascii_case("realm") { + realm = Some(v.to_unescaped()); + } + + if k.eq_ignore_ascii_case("service") { + service = Some(v.to_unescaped()); + } + } + + let realm = realm.ok_or("missing required parameter realm")?; + + Ok(BearerChallenge { + realm: realm.into_boxed_str(), + service, + }) + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::collections::BTreeMap; + use std::convert::TryFrom; + use std::fs; + use std::path; + use std::result::Result; + + use rstest::rstest; + use sha2::Digest as _; + use tempfile::TempDir; + use testcontainers::{ + core::{Mount, WaitFor}, + runners::SyncRunner as _, + ContainerRequest, GenericImage, ImageExt as _, + }; + + use crate::manifest::{self, IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE}; + + const HELLO_IMAGE_NO_TAG: &str = "webassembly.azurecr.io/hello-wasm"; + const HELLO_IMAGE_TAG: &str = "webassembly.azurecr.io/hello-wasm:v1"; + const HELLO_IMAGE_DIGEST: &str = "webassembly.azurecr.io/hello-wasm@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"; + const HELLO_IMAGE_TAG_AND_DIGEST: &str = "webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7"; + const TEST_IMAGES: &[&str] = &[ + // TODO(jlegrone): this image cannot be pulled currently because no `latest` + // tag exists on the image repository. Re-enable this image + // in tests once `latest` is published. + // HELLO_IMAGE_NO_TAG, + HELLO_IMAGE_TAG, + HELLO_IMAGE_DIGEST, + HELLO_IMAGE_TAG_AND_DIGEST, + ]; + const GHCR_IO_IMAGE: &str = "ghcr.io/krustlet/oci-distribution/hello-wasm:v1"; + const DOCKER_IO_IMAGE: &str = "docker.io/library/hello-world@sha256:37a0b92b08d4919615c3ee023f7ddb068d12b8387475d64c622ac30f45c29c51"; + const HTPASSWD: &str = "testuser:$2y$05$8/q2bfRcX74EuxGf0qOcSuhWDQJXrgWiy6Fi73/JM2tKC66qSrLve"; + const HTPASSWD_USERNAME: &str = "testuser"; + const HTPASSWD_PASSWORD: &str = "testpassword"; + + #[test] + fn test_apply_accept() -> anyhow::Result<()> { + assert_eq!( + RequestBuilderWrapper::from_client(&mut Client::default(), |client| client + .get("https://example.com/some/module.wasm")) + .apply_accept(&["*/*"])? + .into_request_builder() + .build()? + .headers()["Accept"], + "*/*" + ); + + assert_eq!( + RequestBuilderWrapper::from_client(&mut Client::default(), |client| client + .get("https://example.com/some/module.wasm")) + .apply_accept(MIME_TYPES_DISTRIBUTION_MANIFEST)? + .into_request_builder() + .build()? + .headers()["Accept"], + MIME_TYPES_DISTRIBUTION_MANIFEST.join(", ") + ); + + Ok(()) + } + + #[test] + fn test_apply_auth_no_token() -> anyhow::Result<()> { + assert!( + !RequestBuilderWrapper::from_client(&mut Client::default(), |client| client + .get("https://example.com/some/module.wasm")) + .apply_auth( + &Reference::try_from(HELLO_IMAGE_TAG)?, + RegistryOperation::Pull + )? + .into_request_builder() + .build()? + .headers() + .contains_key("Authorization") + ); + + Ok(()) + } + + #[test] + fn test_apply_auth_bearer_token() -> anyhow::Result<()> { + use hmac::{Hmac, Mac}; + use jwt::SignWithKey; + use sha2::Sha256; + let mut client = Client::default(); + let header = jwt::header::Header { + algorithm: jwt::algorithm::AlgorithmType::Hs256, + key_id: None, + type_: None, + content_type: None, + }; + let claims: jwt::claims::Claims = Default::default(); + let key: Hmac = Hmac::new_from_slice(b"some-secret").unwrap(); + let token = jwt::Token::new(header, claims) + .sign_with_key(&key)? + .as_str() + .to_string(); + + // we have to have it in the stored auth so we'll get to the token cache check. + client.store_auth( + Reference::try_from(HELLO_IMAGE_TAG)?.resolve_registry(), + RegistryAuth::Anonymous, + ); + + client.tokens.insert( + &Reference::try_from(HELLO_IMAGE_TAG)?, + RegistryOperation::Pull, + RegistryTokenType::Bearer(RegistryToken::Token { + token: token.clone(), + }), + ); + assert_eq!( + RequestBuilderWrapper::from_client(&mut client, |client| client + .get("https://example.com/some/module.wasm")) + .apply_auth( + &Reference::try_from(HELLO_IMAGE_TAG)?, + RegistryOperation::Pull + )? + .into_request_builder() + .build()? + .headers()["Authorization"], + format!("Bearer {}", &token) + ); + + Ok(()) + } + + #[test] + fn test_to_v2_blob_url() { + let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference"); + let c = Client::default(); + + assert_eq!( + c.to_v2_blob_url(&image, "sha256:deadbeef"), + "https://webassembly.azurecr.io/v2/hello-wasm/blobs/sha256:deadbeef" + ); + + image.set_mirror_registry("docker.mirror.io".to_owned()); + assert_eq!( + c.to_v2_blob_url(&image, "sha256:deadbeef"), + "https://docker.mirror.io/v2/hello-wasm/blobs/sha256:deadbeef?ns=webassembly.azurecr.io" + ); + } + + #[rstest(image, expected_uri, expected_mirror_uri, + case(HELLO_IMAGE_NO_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/latest", "https://docker.mirror.io/v2/hello-wasm/manifests/latest?ns=webassembly.azurecr.io"), // TODO: confirm this is the right translation when no tag + case(HELLO_IMAGE_TAG, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/v1", "https://docker.mirror.io/v2/hello-wasm/manifests/v1?ns=webassembly.azurecr.io"), + case(HELLO_IMAGE_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"), + case(HELLO_IMAGE_TAG_AND_DIGEST, "https://webassembly.azurecr.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7", "https://docker.mirror.io/v2/hello-wasm/manifests/sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7?ns=webassembly.azurecr.io"), + )] + fn test_to_v2_manifest(image: &str, expected_uri: &str, expected_mirror_uri: &str) { + let mut reference = Reference::try_from(image).expect("failed to parse reference"); + let c = Client::default(); + assert_eq!(c.to_v2_manifest_url(&reference), expected_uri); + + reference.set_mirror_registry("docker.mirror.io".to_owned()); + assert_eq!(c.to_v2_manifest_url(&reference), expected_mirror_uri); + } + + #[test] + fn test_to_v2_blob_upload_url() { + let image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference"); + let blob_url = Client::default().to_v2_blob_upload_url(&image); + + assert_eq!( + blob_url, + "https://webassembly.azurecr.io/v2/hello-wasm/blobs/uploads/" + ) + } + + #[test] + fn test_to_list_tags_url() { + let mut image = Reference::try_from(HELLO_IMAGE_TAG).expect("failed to parse reference"); + let c = Client::default(); + + assert_eq!( + c.to_list_tags_url(&image), + "https://webassembly.azurecr.io/v2/hello-wasm/tags/list" + ); + + image.set_mirror_registry("docker.mirror.io".to_owned()); + assert_eq!( + c.to_list_tags_url(&image), + "https://docker.mirror.io/v2/hello-wasm/tags/list?ns=webassembly.azurecr.io" + ); + } + + #[test] + fn manifest_url_generation_respects_http_protocol() { + let c = Client::new(ClientConfig { + protocol: ClientProtocol::Http, + ..Default::default() + }); + let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned()) + .expect("Could not parse reference"); + assert_eq!( + "http://webassembly.azurecr.io/v2/hello/manifests/v1", + c.to_v2_manifest_url(&reference) + ); + } + + #[test] + fn blob_url_generation_respects_http_protocol() { + let c = Client::new(ClientConfig { + protocol: ClientProtocol::Http, + ..Default::default() + }); + let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned()) + .expect("Could not parse reference"); + assert_eq!( + "http://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", + c.to_v2_blob_url(&reference, reference.digest().unwrap()) + ); + } + + #[test] + fn manifest_url_generation_uses_https_if_not_on_exception_list() { + let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()]; + let protocol = ClientProtocol::HttpsExcept(insecure_registries); + let c = Client::new(ClientConfig { + protocol, + ..Default::default() + }); + let reference = Reference::try_from("webassembly.azurecr.io/hello:v1".to_owned()) + .expect("Could not parse reference"); + assert_eq!( + "https://webassembly.azurecr.io/v2/hello/manifests/v1", + c.to_v2_manifest_url(&reference) + ); + } + + #[test] + fn manifest_url_generation_uses_http_if_on_exception_list() { + let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()]; + let protocol = ClientProtocol::HttpsExcept(insecure_registries); + let c = Client::new(ClientConfig { + protocol, + ..Default::default() + }); + let reference = Reference::try_from("oci.registry.local/hello:v1".to_owned()) + .expect("Could not parse reference"); + assert_eq!( + "http://oci.registry.local/v2/hello/manifests/v1", + c.to_v2_manifest_url(&reference) + ); + } + + #[test] + fn blob_url_generation_uses_https_if_not_on_exception_list() { + let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()]; + let protocol = ClientProtocol::HttpsExcept(insecure_registries); + let c = Client::new(ClientConfig { + protocol, + ..Default::default() + }); + let reference = Reference::try_from("webassembly.azurecr.io/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned()) + .expect("Could not parse reference"); + assert_eq!( + "https://webassembly.azurecr.io/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", + c.to_v2_blob_url(&reference, reference.digest().unwrap()) + ); + } + + #[test] + fn blob_url_generation_uses_http_if_on_exception_list() { + let insecure_registries = vec!["localhost".to_owned(), "oci.registry.local".to_owned()]; + let protocol = ClientProtocol::HttpsExcept(insecure_registries); + let c = Client::new(ClientConfig { + protocol, + ..Default::default() + }); + let reference = Reference::try_from("oci.registry.local/hello@sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".to_owned()) + .expect("Could not parse reference"); + assert_eq!( + "http://oci.registry.local/v2/hello/blobs/sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", + c.to_v2_blob_url(&reference, reference.digest().unwrap()) + ); + } + + #[test] + fn can_generate_valid_digest() { + let bytes = b"hellobytes"; + let hash = sha256_digest(bytes); + + let combination = vec![b"hello".to_vec(), b"bytes".to_vec()]; + let combination_hash = + sha256_digest(&combination.into_iter().flatten().collect::>()); + + assert_eq!( + hash, + "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99" + ); + assert_eq!( + combination_hash, + "sha256:fdbd95aafcbc814a2600fcc54c1e1706f52d2f9bf45cf53254f25bcd7599ce99" + ); + } + + #[test] + fn test_registry_token_deserialize() { + // 'token' field, standalone + let text = r#"{"token": "abc"}"#; + let res: Result = serde_json::from_str(text); + assert!(res.is_ok()); + let rt = res.unwrap(); + assert_eq!(rt.token(), "abc"); + + // 'access_token' field, standalone + let text = r#"{"access_token": "xyz"}"#; + let res: Result = serde_json::from_str(text); + assert!(res.is_ok()); + let rt = res.unwrap(); + assert_eq!(rt.token(), "xyz"); + + // both 'token' and 'access_token' fields, 'token' field takes precedence + let text = r#"{"access_token": "xyz", "token": "abc"}"#; + let res: Result = serde_json::from_str(text); + assert!(res.is_ok()); + let rt = res.unwrap(); + assert_eq!(rt.token(), "abc"); + + // both 'token' and 'access_token' fields, 'token' field takes precedence (reverse order) + let text = r#"{"token": "abc", "access_token": "xyz"}"#; + let res: Result = serde_json::from_str(text); + assert!(res.is_ok()); + let rt = res.unwrap(); + assert_eq!(rt.token(), "abc"); + + // non-string fields do not break parsing + let text = r#"{"aaa": 300, "access_token": "xyz", "token": "abc", "zzz": 600}"#; + let res: Result = serde_json::from_str(text); + assert!(res.is_ok()); + + // Note: tokens should always be strings. The next two tests ensure that if one field + // is invalid (integer), then parse can still succeed if the other field is a string. + // + // numeric 'access_token' field, but string 'token' field does not in parse error + let text = r#"{"access_token": 300, "token": "abc"}"#; + let res: Result = serde_json::from_str(text); + assert!(res.is_ok()); + let rt = res.unwrap(); + assert_eq!(rt.token(), "abc"); + + // numeric 'token' field, but string 'accesss_token' field does not in parse error + let text = r#"{"access_token": "xyz", "token": 300}"#; + let res: Result = serde_json::from_str(text); + assert!(res.is_ok()); + let rt = res.unwrap(); + assert_eq!(rt.token(), "xyz"); + + // numeric 'token' field results in parse error + let text = r#"{"token": 300}"#; + let res: Result = serde_json::from_str(text); + assert!(res.is_err()); + + // numeric 'access_token' field results in parse error + let text = r#"{"access_token": 300}"#; + let res: Result = serde_json::from_str(text); + assert!(res.is_err()); + + // object 'token' field results in parse error + let text = r#"{"token": {"some": "thing"}}"#; + let res: Result = serde_json::from_str(text); + assert!(res.is_err()); + + // object 'access_token' field results in parse error + let text = r#"{"access_token": {"some": "thing"}}"#; + let res: Result = serde_json::from_str(text); + assert!(res.is_err()); + + // missing fields results in parse error + let text = r#"{"some": "thing"}"#; + let res: Result = serde_json::from_str(text); + assert!(res.is_err()); + + // bad JSON results in parse error + let text = r#"{"token": "abc""#; + let res: Result = serde_json::from_str(text); + assert!(res.is_err()); + + // worse JSON results in parse error + let text = r#"_ _ _ kjbwef??98{9898 }} }}"#; + let res: Result = serde_json::from_str(text); + assert!(res.is_err()); + } + + fn check_auth_token(token: &str) { + // We test that the token is longer than a minimal hash. + assert!(token.len() > 64); + } + + #[test] + fn test_auth() { + for &image in TEST_IMAGES { + let reference = Reference::try_from(image).expect("failed to parse reference"); + let mut c = Client::default(); + let token = c + .auth( + &reference, + &RegistryAuth::Anonymous, + RegistryOperation::Pull, + ) + .expect("result from auth request"); + + assert!(token.is_some()); + check_auth_token(token.unwrap().as_ref()); + + let tok = c + .tokens + .get(&reference, RegistryOperation::Pull) + .expect("token is available"); + // We test that the token is longer than a minimal hash. + if let RegistryTokenType::Bearer(tok) = tok { + check_auth_token(tok.token()); + } else { + panic!("Unexpeted Basic Auth Token"); + } + } + } + + #[test] + #[cfg_attr(not(feature = "test-registry"), ignore)] + fn test_list_tags() { + let test_container = registry_image_edge() + .start() + .expect("Failed to start registry container"); + let port = test_container + .get_host_port_ipv4(5000) + .expect("Failed to get port"); + let auth = + RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string()); + + let mut client = Client::new(ClientConfig { + protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]), + ..Default::default() + }); + + let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap(); + client + .auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull) + .expect("cannot authenticate against registry for pull operation"); + + let (manifest, _digest) = client + ._pull_image_manifest(&image) + .expect("failed to pull manifest"); + + let image_data = client + .pull(&image, &auth, vec![manifest::WASM_LAYER_MEDIA_TYPE]) + .expect("failed to pull image"); + + for i in 0..=3 { + let push_image: Reference = format!("localhost:{}/hello-wasm:1.0.{}", port, i) + .parse() + .unwrap(); + client + .auth(&push_image, &auth, RegistryOperation::Push) + .expect("authenticated"); + client + .push( + &push_image, + &image_data.layers, + image_data.config.clone(), + &auth, + Some(manifest.clone()), + ) + .expect("Failed to push Image"); + } + + let image: Reference = format!("localhost:{}/hello-wasm:1.0.1", port) + .parse() + .unwrap(); + let response = client + .list_tags(&image, &RegistryAuth::Anonymous, Some(2), Some("1.0.1")) + .expect("Cannot list Tags"); + assert_eq!(response.tags, vec!["1.0.2", "1.0.3"]) + } + + #[test] + fn test_pull_manifest_private() { + for &image in TEST_IMAGES { + let reference = Reference::try_from(image).expect("failed to parse reference"); + // Currently, pull_manifest does not perform Authz, so this will fail. + let mut c = Client::default(); + c._pull_image_manifest(&reference) + .expect_err("pull manifest should fail"); + + // But this should pass + let mut c = Client::default(); + c.auth( + &reference, + &RegistryAuth::Anonymous, + RegistryOperation::Pull, + ) + .expect("authenticated"); + let (manifest, _) = c + ._pull_image_manifest(&reference) + .expect("pull manifest should not fail"); + + // The test on the manifest checks all fields. This is just a brief sanity check. + assert_eq!(manifest.schema_version, 2); + assert!(!manifest.layers.is_empty()); + } + } + + #[test] + fn test_pull_manifest_public() { + for &image in TEST_IMAGES { + let reference = Reference::try_from(image).expect("failed to parse reference"); + let mut c = Client::default(); + let (manifest, _) = c + .pull_image_manifest(&reference, &RegistryAuth::Anonymous) + .expect("pull manifest should not fail"); + + // The test on the manifest checks all fields. This is just a brief sanity check. + assert_eq!(manifest.schema_version, 2); + assert!(!manifest.layers.is_empty()); + } + } + + #[test] + fn pull_manifest_and_config_public() { + for &image in TEST_IMAGES { + let reference = Reference::try_from(image).expect("failed to parse reference"); + let mut c = Client::default(); + let (manifest, _, config) = c + .pull_manifest_and_config(&reference, &RegistryAuth::Anonymous) + .expect("pull manifest and config should not fail"); + + // The test on the manifest checks all fields. This is just a brief sanity check. + assert_eq!(manifest.schema_version, 2); + assert!(!manifest.layers.is_empty()); + assert!(!config.is_empty()); + } + } + + #[test] + fn test_fetch_digest() { + let mut c = Client::default(); + + for &image in TEST_IMAGES { + let reference = Reference::try_from(image).expect("failed to parse reference"); + c.fetch_manifest_digest(&reference, &RegistryAuth::Anonymous) + .expect("pull manifest should not fail"); + + // This should pass + let reference = Reference::try_from(image).expect("failed to parse reference"); + let mut c = Client::default(); + c.auth( + &reference, + &RegistryAuth::Anonymous, + RegistryOperation::Pull, + ) + .expect("authenticated"); + let digest = c + .fetch_manifest_digest(&reference, &RegistryAuth::Anonymous) + .expect("pull manifest should not fail"); + + assert_eq!( + digest, + "sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7" + ); + } + } + + #[test] + fn test_pull_blob() { + let mut c = Client::default(); + + for &image in TEST_IMAGES { + let reference = Reference::try_from(image).expect("failed to parse reference"); + c.auth( + &reference, + &RegistryAuth::Anonymous, + RegistryOperation::Pull, + ) + .expect("authenticated"); + let (manifest, _) = c + ._pull_image_manifest(&reference) + .expect("failed to pull manifest"); + + // Pull one specific layer + let mut file: Vec = Vec::new(); + let layer0 = &manifest.layers[0]; + + // This call likes to flake, so we try it at least 5 times + let mut last_error = None; + for i in 1..6 { + if let Err(e) = c.pull_blob(&reference, layer0, &mut file) { + println!( + "Got error on pull_blob call attempt {}. Will retry in 1s: {:?}", + i, e + ); + last_error.replace(e); + std::thread::sleep(std::time::Duration::from_secs(1)); + } else { + last_error = None; + break; + } + } + + if let Some(e) = last_error { + panic!("Unable to pull layer: {:?}", e); + } + + // The manifest says how many bytes we should expect. + assert_eq!(file.len(), layer0.size as usize); + } + } + + #[test] + fn test_pull() { + for &image in TEST_IMAGES { + let reference = Reference::try_from(image).expect("failed to parse reference"); + + // This call likes to flake, so we try it at least 5 times + let mut last_error = None; + let mut image_data = None; + for i in 1..6 { + match Client::default().pull( + &reference, + &RegistryAuth::Anonymous, + vec![manifest::WASM_LAYER_MEDIA_TYPE], + ) { + Ok(data) => { + image_data = Some(data); + last_error = None; + break; + } + Err(e) => { + println!( + "Got error on pull call attempt {}. Will retry in 1s: {:?}", + i, e + ); + last_error.replace(e); + std::thread::sleep(std::time::Duration::from_secs(1)); + } + } + } + + if let Some(e) = last_error { + panic!("Unable to pull layer: {:?}", e); + } + + assert!(image_data.is_some()); + let image_data = image_data.unwrap(); + assert!(!image_data.layers.is_empty()); + assert!(image_data.digest.is_some()); + } + } + + /// Attempting to pull an image without any layer validation should fail. + #[test] + fn test_pull_without_layer_validation() { + for &image in TEST_IMAGES { + let reference = Reference::try_from(image).expect("failed to parse reference"); + assert!(Client::default() + .pull(&reference, &RegistryAuth::Anonymous, vec![],) + .is_err()); + } + } + + /// Attempting to pull an image with the wrong list of layer validations should fail. + #[test] + fn test_pull_wrong_layer_validation() { + for &image in TEST_IMAGES { + let reference = Reference::try_from(image).expect("failed to parse reference"); + assert!(Client::default() + .pull(&reference, &RegistryAuth::Anonymous, vec!["text/plain"],) + .is_err()); + } + } + + // This is the latest build of distribution/distribution from the `main` branch + // Until distribution v3 is relased, this is the only way to have this fix + // https://github.com/distribution/distribution/pull/3143 + // + // We require this fix only when testing the capability to list tags + fn registry_image_edge() -> GenericImage { + GenericImage::new("distribution/distribution", "edge") + .with_wait_for(WaitFor::message_on_stderr("listening on ")) + } + + fn registry_image() -> GenericImage { + GenericImage::new("docker.io/library/registry", "2") + .with_wait_for(WaitFor::message_on_stderr("listening on ")) + } + + fn registry_image_basic_auth(auth_path: &str) -> ContainerRequest { + GenericImage::new("docker.io/library/registry", "2") + .with_wait_for(WaitFor::message_on_stderr("listening on ")) + .with_env_var("REGISTRY_AUTH", "htpasswd") + .with_env_var("REGISTRY_AUTH_HTPASSWD_REALM", "Registry Realm") + .with_env_var("REGISTRY_AUTH_HTPASSWD_PATH", "/auth/htpasswd") + .with_mount(Mount::bind_mount(auth_path, "/auth")) + } + + #[test] + #[cfg_attr(not(feature = "test-registry"), ignore)] + fn can_push_chunk() { + let test_container = registry_image() + .start() + .expect("Failed to start registry container"); + let port = test_container + .get_host_port_ipv4(5000) + .expect("Failed to get port"); + + let mut c = Client::new(ClientConfig { + protocol: ClientProtocol::Http, + ..Default::default() + }); + let url = format!("localhost:{}/hello-wasm:v1", port); + let image: Reference = url.parse().unwrap(); + + c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push) + .expect("result from auth request"); + + let location = c + .begin_push_chunked_session(&image) + .expect("failed to begin push session"); + + let image_data: Vec> = vec![b"iamawebassemblymodule".to_vec()]; + + let (next_location, next_byte) = c + .push_chunk(&location, &image, &image_data[0], 0) + .expect("failed to push layer"); + + // Location should include original URL with at session ID appended + assert!(next_location.len() >= url.len() + "6987887f-0196-45ee-91a1-2dfad901bea0".len()); + assert_eq!( + next_byte, + "iamawebassemblymodule".to_string().into_bytes().len() + ); + + let layer_location = c + .end_push_chunked_session(&next_location, &image, &sha256_digest(&image_data[0])) + .expect("failed to end push session"); + + assert_eq!(layer_location, format!("http://localhost:{}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b", port)); + } + + #[test] + #[cfg_attr(not(feature = "test-registry"), ignore)] + fn can_push_multiple_chunks() { + let test_container = registry_image() + .start() + .expect("Failed to start registry container"); + let port = test_container + .get_host_port_ipv4(5000) + .expect("Failed to get port"); + + let mut c = Client::new(ClientConfig { + protocol: ClientProtocol::Http, + ..Default::default() + }); + // set a super small chunk size - done to force multiple pushes + c.push_chunk_size = 3; + let url = format!("localhost:{}/hello-wasm:v1", port); + let image: Reference = url.parse().unwrap(); + + c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push) + .expect("result from auth request"); + + let image_data: Vec = + b"i am a big webassembly mode that needs chunked uploads".to_vec(); + let image_digest = sha256_digest(&image_data); + + let location = c + .push_blob_chunked(&image, &image_data, &image_digest) + .expect("failed to begin push session"); + + assert_eq!( + location, + format!( + "http://localhost:{}/v2/hello-wasm/blobs/{}", + port, image_digest + ) + ); + } + + #[test] + #[cfg_attr(not(feature = "test-registry"), ignore)] + fn test_image_roundtrip_anon_auth() { + let test_container = registry_image() + .start() + .expect("Failed to start registry container"); + + test_image_roundtrip(&RegistryAuth::Anonymous, &test_container); + } + + #[test] + #[cfg_attr(not(feature = "test-registry"), ignore)] + fn test_image_roundtrip_basic_auth() { + let auth_dir = TempDir::new().expect("cannot create tmp directory"); + let htpasswd_path = path::Path::join(auth_dir.path(), "htpasswd"); + fs::write(htpasswd_path, HTPASSWD).expect("cannot write htpasswd file"); + + let image = registry_image_basic_auth( + auth_dir + .path() + .to_str() + .expect("cannot convert htpasswd_path to string"), + ); + let test_container = image.start().expect("cannot registry container"); + + let auth = + RegistryAuth::Basic(HTPASSWD_USERNAME.to_string(), HTPASSWD_PASSWORD.to_string()); + + test_image_roundtrip(&auth, &test_container); + } + + fn test_image_roundtrip( + registry_auth: &RegistryAuth, + test_container: &testcontainers::Container, + ) { + let _ = tracing_subscriber::fmt::try_init(); + let port = test_container + .get_host_port_ipv4(5000) + .expect("Failed to get port"); + + let mut c = Client::new(ClientConfig { + protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]), + ..Default::default() + }); + + // pulling webassembly.azurecr.io/hello-wasm:v1 + let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap(); + c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull) + .expect("cannot authenticate against registry for pull operation"); + + let (manifest, _digest) = c + ._pull_image_manifest(&image) + .expect("failed to pull manifest"); + + let image_data = c + .pull(&image, registry_auth, vec![manifest::WASM_LAYER_MEDIA_TYPE]) + .expect("failed to pull image"); + + let push_image: Reference = format!("localhost:{}/hello-wasm:v1", port).parse().unwrap(); + c.auth(&push_image, registry_auth, RegistryOperation::Push) + .expect("authenticated"); + + c.push( + &push_image, + &image_data.layers, + image_data.config.clone(), + registry_auth, + Some(manifest.clone()), + ) + .expect("failed to push image"); + + let pulled_image_data = c + .pull( + &push_image, + registry_auth, + vec![manifest::WASM_LAYER_MEDIA_TYPE], + ) + .expect("failed to pull pushed image"); + + let (pulled_manifest, _digest) = c + ._pull_image_manifest(&push_image) + .expect("failed to pull pushed image manifest"); + + assert!(image_data.layers.len() == 1); + assert!(pulled_image_data.layers.len() == 1); + assert_eq!( + image_data.layers[0].data.len(), + pulled_image_data.layers[0].data.len() + ); + assert_eq!(image_data.layers[0].data, pulled_image_data.layers[0].data); + + assert_eq!(manifest.media_type, pulled_manifest.media_type); + assert_eq!(manifest.schema_version, pulled_manifest.schema_version); + assert_eq!(manifest.config.digest, pulled_manifest.config.digest); + } + + #[test] + fn test_raw_manifest_digest() { + let _ = tracing_subscriber::fmt::try_init(); + + let mut c = Client::default(); + + // pulling webassembly.azurecr.io/hello-wasm:v1@sha256:51d9b231d5129e3ffc267c9d455c49d789bf3167b611a07ab6e4b3304c96b0e7 + let image: Reference = HELLO_IMAGE_TAG_AND_DIGEST.parse().unwrap(); + c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Pull) + .expect("cannot authenticate against registry for pull operation"); + + let (manifest, _) = c + .pull_manifest_raw( + &image, + &RegistryAuth::Anonymous, + MIME_TYPES_DISTRIBUTION_MANIFEST, + ) + .expect("failed to pull manifest"); + + // Compute the digest of the returned manifest text. + let digest = sha2::Sha256::digest(manifest); + let hex = format!("sha256:{:x}", digest); + + // Validate that the computed digest and the digest in the pulled reference match. + assert_eq!(image.digest().unwrap(), hex); + } + + #[test] + #[cfg_attr(not(feature = "test-registry"), ignore)] + fn test_mount() { + // initialize the registry + + use crate::manifest::OciDescriptor; + let test_container = registry_image().start().expect("Failed to start registry"); + let port = test_container + .get_host_port_ipv4(5000) + .expect("Failed to get port"); + + let mut c = Client::new(ClientConfig { + protocol: ClientProtocol::HttpsExcept(vec![format!("localhost:{}", port)]), + ..Default::default() + }); + + // Create a dummy layer and push it to `layer-repository` + let layer_reference: Reference = format!("localhost:{}/layer-repository", port) + .parse() + .unwrap(); + let layer_data = vec![1u8, 2, 3, 4]; + let layer = OciDescriptor { + digest: sha256_digest(&layer_data), + ..Default::default() + }; + c.push_blob(&layer_reference, &[1, 2, 3, 4], &layer.digest) + .expect("Failed to push"); + + // Mount the layer at `image-repository` + let image_reference: Reference = format!("localhost:{}/image-repository", port) + .parse() + .unwrap(); + c.mount_blob(&image_reference, &layer_reference, &layer.digest) + .expect("Failed to mount"); + + // Pull the layer from `image-repository` + let mut buf = Vec::new(); + c.pull_blob(&image_reference, &layer, &mut buf) + .expect("Failed to pull"); + + assert_eq!(layer_data, buf); + } + + #[test] + fn test_platform_resolution() { + // test that we get an error when we pull a manifest list + let reference = Reference::try_from(DOCKER_IO_IMAGE).expect("failed to parse reference"); + let mut c = Client::new(ClientConfig { + platform_resolver: None, + ..Default::default() + }); + let err = c + .pull_image_manifest(&reference, &RegistryAuth::Anonymous) + .unwrap_err(); + assert_eq!( + format!("{}", err), + "Received Image Index/Manifest List, but platform_resolver was not defined on the client config. Consider setting platform_resolver" + ); + + c = Client::new(ClientConfig { + platform_resolver: Some(Box::new(linux_amd64_resolver)), + ..Default::default() + }); + let (_manifest, digest) = c + .pull_image_manifest(&reference, &RegistryAuth::Anonymous) + .expect("Couldn't pull manifest"); + assert_eq!( + digest, + "sha256:f54a58bc1aac5ea1a25d796ae155dc228b3f0e11d046ae276b39c4bf2f13d8c4" + ); + } + + #[test] + fn test_pull_ghcr_io() { + let reference = Reference::try_from(GHCR_IO_IMAGE).expect("failed to parse reference"); + let mut c = Client::default(); + let (manifest, _manifest_str) = c + .pull_image_manifest(&reference, &RegistryAuth::Anonymous) + .unwrap(); + assert_eq!(manifest.config.media_type, manifest::WASM_CONFIG_MEDIA_TYPE); + } + + #[test] + #[ignore] + fn test_roundtrip_multiple_layers() { + let _ = tracing_subscriber::fmt::try_init(); + let mut c = Client::new(ClientConfig { + protocol: ClientProtocol::HttpsExcept(vec!["oci.registry.local".to_string()]), + ..Default::default() + }); + let src_image = Reference::try_from("registry:2.7.1").expect("failed to parse reference"); + let dest_image = Reference::try_from("oci.registry.local/registry:roundtrip-test") + .expect("failed to parse reference"); + + let image = c + .pull( + &src_image, + &RegistryAuth::Anonymous, + vec![IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE], + ) + .expect("Failed to pull manifest"); + assert!(image.layers.len() > 1); + + let ImageData { + layers, + config, + manifest, + .. + } = image; + c.push( + &dest_image, + &layers, + config, + &RegistryAuth::Anonymous, + manifest, + ) + .expect("Failed to pull manifest"); + + c.pull_image_manifest(&dest_image, &RegistryAuth::Anonymous) + .expect("Failed to pull manifest"); + } + + #[test] + fn test_hashable_image_layer() { + use itertools::Itertools; + + // First two should be identical; others differ + let image_layers = Vec::from([ + ImageLayer { + data: Vec::from([0, 1, 2, 3]), + media_type: "media_type".to_owned(), + annotations: Some(BTreeMap::from([ + ("0".to_owned(), "1".to_owned()), + ("2".to_owned(), "3".to_owned()), + ])), + }, + ImageLayer { + data: Vec::from([0, 1, 2, 3]), + media_type: "media_type".to_owned(), + annotations: Some(BTreeMap::from([ + ("2".to_owned(), "3".to_owned()), + ("0".to_owned(), "1".to_owned()), + ])), + }, + ImageLayer { + data: Vec::from([0, 1, 2, 3]), + media_type: "different_media_type".to_owned(), + annotations: Some(BTreeMap::from([ + ("0".to_owned(), "1".to_owned()), + ("2".to_owned(), "3".to_owned()), + ])), + }, + ImageLayer { + data: Vec::from([0, 1, 2]), + media_type: "media_type".to_owned(), + annotations: Some(BTreeMap::from([ + ("0".to_owned(), "1".to_owned()), + ("2".to_owned(), "3".to_owned()), + ])), + }, + ImageLayer { + data: Vec::from([0, 1, 2, 3]), + media_type: "media_type".to_owned(), + annotations: Some(BTreeMap::from([ + ("1".to_owned(), "0".to_owned()), + ("2".to_owned(), "3".to_owned()), + ])), + }, + ]); + + assert_eq!( + &image_layers[0], &image_layers[1], + "image_layers[0] should equal image_layers[1]" + ); + assert_ne!( + &image_layers[0], &image_layers[2], + "image_layers[0] should not equal image_layers[2]" + ); + assert_ne!( + &image_layers[0], &image_layers[3], + "image_layers[0] should not equal image_layers[3]" + ); + assert_ne!( + &image_layers[0], &image_layers[4], + "image_layers[0] should not equal image_layers[4]" + ); + assert_ne!( + &image_layers[2], &image_layers[3], + "image_layers[2] should not equal image_layers[3]" + ); + assert_ne!( + &image_layers[2], &image_layers[4], + "image_layers[2] should not equal image_layers[4]" + ); + assert_ne!( + &image_layers[3], &image_layers[4], + "image_layers[3] should not equal image_layers[4]" + ); + + let deduped: Vec = image_layers.clone().into_iter().unique().collect(); + assert_eq!( + image_layers.len() - 1, + deduped.len(), + "after deduplication, there should be one less image layer" + ); + } +} diff --git a/src/client.rs b/src/client.rs index 6d47e3ab..8e8f8d07 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,7 +1,6 @@ //! OCI distribution client for fetching oci images from an OCI compliant remote store -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::convert::TryFrom; -use std::hash::Hash; use std::sync::Arc; use std::time::Duration; @@ -13,19 +12,18 @@ use http_auth::{parser::ChallengeParser, ChallengeRef}; use olpc_cjson::CanonicalFormatter; use reqwest::header::HeaderMap; use reqwest::{NoProxy, Proxy, RequestBuilder, Response, Url}; -use serde::Deserialize; use serde::Serialize; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::RwLock; use tracing::{debug, trace, warn}; pub use crate::blob::*; -use crate::config::ConfigFile; +pub use crate::types::*; + use crate::digest::{digest_header_value, validate_digest, Digest, Digester}; use crate::errors::*; use crate::manifest::{ - ImageIndexEntry, OciDescriptor, OciImageIndex, OciImageManifest, OciManifest, Versioned, - IMAGE_CONFIG_MEDIA_TYPE, IMAGE_LAYER_GZIP_MEDIA_TYPE, IMAGE_LAYER_MEDIA_TYPE, + ImageIndexEntry, OciImageIndex, OciImageManifest, OciManifest, Versioned, IMAGE_MANIFEST_LIST_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_INDEX_MEDIA_TYPE, OCI_IMAGE_MEDIA_TYPE, }; @@ -55,191 +53,6 @@ pub const DEFAULT_TOKEN_EXPIRATION_SECS: usize = 60; static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); -/// The data for an image or module. -#[derive(Clone)] -pub struct ImageData { - /// The layers of the image or module. - pub layers: Vec, - /// The digest of the image or module. - pub digest: Option, - /// The Configuration object of the image or module. - pub config: Config, - /// The manifest of the image or module. - pub manifest: Option, -} - -/// The data returned by an OCI registry after a successful push -/// operation is completed -pub struct PushResponse { - /// Pullable url for the config - pub config_url: String, - /// Pullable url for the manifest - pub manifest_url: String, -} - -/// The data returned by a successful tags/list Request -#[derive(Deserialize, Debug)] -pub struct TagResponse { - /// Repository Name - pub name: String, - /// List of existing Tags - pub tags: Vec, -} - -/// Layer descriptor required to pull a layer -pub struct LayerDescriptor<'a> { - /// The digest of the layer - pub digest: &'a str, - /// Optional list of additional URIs to pull the layer from - pub urls: &'a Option>, -} - -/// A trait for converting any type into a [`LayerDescriptor`] -pub trait AsLayerDescriptor { - /// Convert the type to a LayerDescriptor reference - fn as_layer_descriptor(&self) -> LayerDescriptor<'_>; -} - -impl AsLayerDescriptor for &T { - fn as_layer_descriptor(&self) -> LayerDescriptor<'_> { - (*self).as_layer_descriptor() - } -} - -impl AsLayerDescriptor for &str { - fn as_layer_descriptor(&self) -> LayerDescriptor<'_> { - LayerDescriptor { - digest: self, - urls: &None, - } - } -} - -impl AsLayerDescriptor for &OciDescriptor { - fn as_layer_descriptor(&self) -> LayerDescriptor<'_> { - LayerDescriptor { - digest: &self.digest, - urls: &self.urls, - } - } -} - -impl AsLayerDescriptor for &LayerDescriptor<'_> { - fn as_layer_descriptor(&self) -> LayerDescriptor<'_> { - LayerDescriptor { - digest: self.digest, - urls: self.urls, - } - } -} - -/// The data and media type for an image layer -#[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub struct ImageLayer { - /// The data of this layer - pub data: Vec, - /// The media type of this layer - pub media_type: String, - /// This OPTIONAL property contains arbitrary metadata for this descriptor. - /// This OPTIONAL property MUST use the [annotation rules](https://github.com/opencontainers/image-spec/blob/main/annotations.md#rules) - pub annotations: Option>, -} - -impl ImageLayer { - /// Constructs a new ImageLayer struct with provided data and media type - pub fn new( - data: Vec, - media_type: String, - annotations: Option>, - ) -> Self { - ImageLayer { - data, - media_type, - annotations, - } - } - - /// Constructs a new ImageLayer struct with provided data and - /// media type application/vnd.oci.image.layer.v1.tar - pub fn oci_v1(data: Vec, annotations: Option>) -> Self { - Self::new(data, IMAGE_LAYER_MEDIA_TYPE.to_string(), annotations) - } - /// Constructs a new ImageLayer struct with provided data and - /// media type application/vnd.oci.image.layer.v1.tar+gzip - pub fn oci_v1_gzip(data: Vec, annotations: Option>) -> Self { - Self::new(data, IMAGE_LAYER_GZIP_MEDIA_TYPE.to_string(), annotations) - } - - /// Helper function to compute the sha256 digest of an image layer - pub fn sha256_digest(&self) -> String { - sha256_digest(&self.data) - } -} - -/// The data and media type for a configuration object -#[derive(Clone)] -pub struct Config { - /// The data of this config object - pub data: Vec, - /// The media type of this object - pub media_type: String, - /// This OPTIONAL property contains arbitrary metadata for this descriptor. - /// This OPTIONAL property MUST use the [annotation rules](https://github.com/opencontainers/image-spec/blob/main/annotations.md#rules) - pub annotations: Option>, -} - -impl Config { - /// Constructs a new Config struct with provided data and media type - pub fn new( - data: Vec, - media_type: String, - annotations: Option>, - ) -> Self { - Config { - data, - media_type, - annotations, - } - } - - /// Constructs a new Config struct with provided data and - /// media type application/vnd.oci.image.config.v1+json - pub fn oci_v1(data: Vec, annotations: Option>) -> Self { - Self::new(data, IMAGE_CONFIG_MEDIA_TYPE.to_string(), annotations) - } - - /// Construct a new Config struct with provided [`ConfigFile`] and - /// media type `application/vnd.oci.image.config.v1+json` - pub fn oci_v1_from_config_file( - config_file: ConfigFile, - annotations: Option>, - ) -> Result { - let data = serde_json::to_vec(&config_file)?; - Ok(Self::new( - data, - IMAGE_CONFIG_MEDIA_TYPE.to_string(), - annotations, - )) - } - - /// Helper function to compute the sha256 digest of this config object - pub fn sha256_digest(&self) -> String { - sha256_digest(&self.data) - } -} - -impl TryFrom for ConfigFile { - type Error = crate::errors::OciDistributionError; - - fn try_from(config: Config) -> Result { - let config = String::from_utf8(config.data) - .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?; - let config_file: ConfigFile = serde_json::from_str(&config) - .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?; - Ok(config_file) - } -} - /// The OCI client connects to an OCI registry and fetches OCI images. /// /// An OCI registry is a container registry that adheres to the OCI Distribution @@ -1755,7 +1568,7 @@ impl<'a> RequestBuilderWrapper<'a> { } // Composable functions applicable to a `RequestBuilderWrapper` -impl<'a> RequestBuilderWrapper<'a> { +impl RequestBuilderWrapper<'_> { fn apply_accept(&self, accept: &[&str]) -> Result { let request_builder = self .request_builder @@ -1948,9 +1761,10 @@ pub fn linux_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option { manifests .iter() .find(|entry| { - entry.platform.as_ref().map_or(false, |platform| { - platform.os == "linux" && platform.architecture == "amd64" - }) + entry + .platform + .as_ref() + .is_some_and(|platform| platform.os == "linux" && platform.architecture == "amd64") }) .map(|entry| entry.digest.clone()) } @@ -1960,7 +1774,7 @@ pub fn windows_amd64_resolver(manifests: &[ImageIndexEntry]) -> Option { manifests .iter() .find(|entry| { - entry.platform.as_ref().map_or(false, |platform| { + entry.platform.as_ref().is_some_and(|platform| { platform.os == "windows" && platform.architecture == "amd64" }) }) @@ -2008,7 +1822,7 @@ pub fn current_platform_resolver(manifests: &[ImageIndexEntry]) -> Option> for BearerChallenge { #[cfg(test)] mod test { use super::*; + use std::collections::BTreeMap; use std::convert::TryFrom; - use std::fs; - use std::path; use std::result::Result; + use std::{fs, path}; use rstest::rstest; use sha2::Digest as _; use tempfile::TempDir; + use tokio::io::AsyncReadExt; use tokio_util::io::StreamReader; use crate::manifest::{self, IMAGE_DOCKER_LAYER_GZIP_MEDIA_TYPE}; - #[cfg(feature = "test-registry")] use testcontainers::{ core::{Mount, WaitFor}, - runners::AsyncRunner, - ContainerRequest, GenericImage, ImageExt, + runners::AsyncRunner as _, + ContainerRequest, GenericImage, ImageExt as _, }; const HELLO_IMAGE_NO_TAG: &str = "webassembly.azurecr.io/hello-wasm"; @@ -2538,8 +2352,8 @@ mod test { } } - #[cfg(feature = "test-registry")] #[tokio::test] + #[cfg_attr(not(feature = "test-registry"), ignore)] async fn test_list_tags() { let test_container = registry_image_edge() .start() @@ -2923,19 +2737,16 @@ mod test { // https://github.com/distribution/distribution/pull/3143 // // We require this fix only when testing the capability to list tags - #[cfg(feature = "test-registry")] fn registry_image_edge() -> GenericImage { GenericImage::new("distribution/distribution", "edge") .with_wait_for(WaitFor::message_on_stderr("listening on ")) } - #[cfg(feature = "test-registry")] fn registry_image() -> GenericImage { GenericImage::new("docker.io/library/registry", "2") .with_wait_for(WaitFor::message_on_stderr("listening on ")) } - #[cfg(feature = "test-registry")] fn registry_image_basic_auth(auth_path: &str) -> ContainerRequest { GenericImage::new("docker.io/library/registry", "2") .with_wait_for(WaitFor::message_on_stderr("listening on ")) @@ -2946,7 +2757,7 @@ mod test { } #[tokio::test] - #[cfg(feature = "test-registry")] + #[cfg_attr(not(feature = "test-registry"), ignore)] async fn can_push_chunk() { let test_container = registry_image() .start() @@ -2996,7 +2807,7 @@ mod test { } #[tokio::test] - #[cfg(feature = "test-registry")] + #[cfg_attr(not(feature = "test-registry"), ignore)] async fn can_push_multiple_chunks() { let test_container = registry_image() .start() @@ -3039,7 +2850,7 @@ mod test { } #[tokio::test] - #[cfg(feature = "test-registry")] + #[cfg_attr(not(feature = "test-registry"), ignore)] async fn test_image_roundtrip_anon_auth() { let test_container = registry_image() .start() @@ -3050,7 +2861,7 @@ mod test { } #[tokio::test] - #[cfg(feature = "test-registry")] + #[cfg_attr(not(feature = "test-registry"), ignore)] async fn test_image_roundtrip_basic_auth() { let auth_dir = TempDir::new().expect("cannot create tmp directory"); let htpasswd_path = path::Path::join(auth_dir.path(), "htpasswd"); @@ -3070,7 +2881,6 @@ mod test { test_image_roundtrip(&auth, &test_container).await; } - #[cfg(feature = "test-registry")] async fn test_image_roundtrip( registry_auth: &RegistryAuth, test_container: &testcontainers::ContainerAsync, @@ -3177,6 +2987,8 @@ mod test { #[cfg(feature = "test-registry")] async fn test_mount() { // initialize the registry + + use crate::manifest::OciDescriptor; let test_container = registry_image() .start() .await diff --git a/src/lib.rs b/src/lib.rs index 4d762514..638e584a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,8 @@ use sha2::Digest; pub mod annotations; mod blob; +#[cfg(feature = "blocking")] +pub mod blocking; pub mod client; pub mod config; pub(crate) mod digest; @@ -12,9 +14,11 @@ pub mod errors; pub mod manifest; pub mod secrets; mod token_cache; +mod types; #[doc(inline)] pub use client::Client; + #[doc(inline)] pub use oci_spec::distribution::{ParseError, Reference}; #[doc(inline)] diff --git a/src/manifest.rs b/src/manifest.rs index 0d0f2eaa..acfe94b3 100644 --- a/src/manifest.rs +++ b/src/manifest.rs @@ -2,8 +2,8 @@ use std::collections::BTreeMap; use crate::{ - client::{Config, ImageLayer}, sha256_digest, + types::{Config, ImageLayer}, }; /// The mediatype for WASM layers. diff --git a/src/secrets.rs b/src/secrets.rs index 3be44b31..a918644b 100644 --- a/src/secrets.rs +++ b/src/secrets.rs @@ -21,3 +21,12 @@ impl Authenticable for reqwest::RequestBuilder { } } } +#[cfg(feature = "blocking")] +impl Authenticable for reqwest::blocking::RequestBuilder { + fn apply_authentication(self, auth: &RegistryAuth) -> Self { + match auth { + RegistryAuth::Anonymous => self, + RegistryAuth::Basic(username, password) => self.basic_auth(username, Some(password)), + } + } +} diff --git a/src/token_cache.rs b/src/token_cache.rs index 43606537..5447d510 100644 --- a/src/token_cache.rs +++ b/src/token_cache.rs @@ -66,6 +66,7 @@ struct TokenCacheKey { operation: RegistryOperation, } +#[derive(Debug, Clone)] struct TokenCacheValue { token: RegistryTokenType, expiration: u64, @@ -179,3 +180,114 @@ impl TokenCache { } } } + +#[cfg(feature = "blocking")] +#[derive(Clone)] +pub(crate) struct SyncTokenCache { + // (registry, repository, scope) -> (token, expiration) + tokens: BTreeMap, + /// Default token expiration in seconds, to use when claim doesn't specify a value + pub default_expiration_secs: usize, +} + +#[cfg(feature = "blocking")] +impl SyncTokenCache { + pub(crate) fn new(default_expiration_secs: usize) -> Self { + SyncTokenCache { + tokens: BTreeMap::new(), + default_expiration_secs, + } + } + + pub(crate) fn insert( + &mut self, + reference: &Reference, + op: RegistryOperation, + token: RegistryTokenType, + ) { + let expiration = match token { + RegistryTokenType::Basic(_, _) => u64::MAX, + RegistryTokenType::Bearer(ref t) => { + let token_str = t.token(); + match jwt::Token::< + jwt::header::Header, + jwt::claims::Claims, + jwt::token::Unverified, + >::parse_unverified(token_str) + { + Ok(token) => token.claims().registered.expiration.unwrap_or(u64::MAX), + Err(jwt::Error::NoClaimsComponent) => { + // the token doesn't have a claim that states a + // value for the expiration. We assume it has a 60 + // seconds validity as indicated here: + // https://docs.docker.com/registry/spec/auth/token/#requesting-a-token + // > (Optional) The duration in seconds since the token was issued + // > that it will remain valid. When omitted, this defaults to 60 seconds. + // > For compatibility with older clients, a token should never be returned + // > with less than 60 seconds to live. + let now = SystemTime::now(); + let epoch = now + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); + let expiration = epoch + self.default_expiration_secs as u64; + debug!(?token, "Cannot extract expiration from token's claims, assuming a {} seconds validity", self.default_expiration_secs); + expiration + }, + Err(error) => { + warn!(?error, "Invalid bearer token"); + return; + } + } + } + }; + let registry = reference.resolve_registry().to_string(); + let repository = reference.repository().to_string(); + debug!(%registry, %repository, ?op, %expiration, "Inserting token"); + self.tokens.insert( + TokenCacheKey { + registry, + repository, + operation: op, + }, + TokenCacheValue { token, expiration }, + ); + } + + pub(crate) fn get( + &self, + reference: &Reference, + op: RegistryOperation, + ) -> Option { + let registry = reference.resolve_registry().to_string(); + let repository = reference.repository().to_string(); + let key = TokenCacheKey { + registry, + repository, + operation: op, + }; + match self.tokens.get(&key) { + Some(TokenCacheValue { + ref token, + expiration, + }) => { + let now = SystemTime::now(); + let epoch = now + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); + if epoch > *expiration { + debug!(%key.registry, %key.repository, ?key.operation, %expiration, miss=false, expired=true, "Fetching token"); + None + } else { + debug!(%key.registry, %key.repository, ?key.operation, %expiration, miss=false, expired=false, "Fetching token"); + Some(token.clone()) + } + } + None => { + debug!(%key.registry, %key.repository, ?key.operation, miss = true, "Fetching token"); + None + } + } + } +} diff --git a/src/types.rs b/src/types.rs new file mode 100644 index 00000000..6beb0d0a --- /dev/null +++ b/src/types.rs @@ -0,0 +1,198 @@ +use std::collections::BTreeMap; + +use serde::Deserialize; + +use crate::{ + config::ConfigFile, + errors::{OciDistributionError, Result}, + manifest::{ + OciDescriptor, OciImageManifest, IMAGE_CONFIG_MEDIA_TYPE, IMAGE_LAYER_GZIP_MEDIA_TYPE, + IMAGE_LAYER_MEDIA_TYPE, + }, + sha256_digest, +}; + +/// The data for an image or module. +#[derive(Clone)] +pub struct ImageData { + /// The layers of the image or module. + pub layers: Vec, + /// The digest of the image or module. + pub digest: Option, + /// The Configuration object of the image or module. + pub config: Config, + /// The manifest of the image or module. + pub manifest: Option, +} + +/// The data returned by an OCI registry after a successful push +/// operation is completed +pub struct PushResponse { + /// Pullable url for the config + pub config_url: String, + /// Pullable url for the manifest + pub manifest_url: String, +} + +/// The data returned by a successful tags/list Request +#[derive(Deserialize, Debug)] +pub struct TagResponse { + /// Repository Name + pub name: String, + /// List of existing Tags + pub tags: Vec, +} + +/// Layer descriptor required to pull a layer +pub struct LayerDescriptor<'a> { + /// The digest of the layer + pub digest: &'a str, + /// Optional list of additional URIs to pull the layer from + pub urls: &'a Option>, +} + +/// A trait for converting any type into a [`LayerDescriptor`] +pub trait AsLayerDescriptor { + /// Convert the type to a LayerDescriptor reference + fn as_layer_descriptor(&self) -> LayerDescriptor<'_>; +} + +impl AsLayerDescriptor for &T { + fn as_layer_descriptor(&self) -> LayerDescriptor<'_> { + (*self).as_layer_descriptor() + } +} + +impl AsLayerDescriptor for &str { + fn as_layer_descriptor(&self) -> LayerDescriptor<'_> { + LayerDescriptor { + digest: self, + urls: &None, + } + } +} + +impl AsLayerDescriptor for &OciDescriptor { + fn as_layer_descriptor(&self) -> LayerDescriptor<'_> { + LayerDescriptor { + digest: &self.digest, + urls: &self.urls, + } + } +} + +impl AsLayerDescriptor for &LayerDescriptor<'_> { + fn as_layer_descriptor(&self) -> LayerDescriptor<'_> { + LayerDescriptor { + digest: self.digest, + urls: self.urls, + } + } +} + +/// The data and media type for an image layer +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct ImageLayer { + /// The data of this layer + pub data: Vec, + /// The media type of this layer + pub media_type: String, + /// This OPTIONAL property contains arbitrary metadata for this descriptor. + /// This OPTIONAL property MUST use the [annotation rules](https://github.com/opencontainers/image-spec/blob/main/annotations.md#rules) + pub annotations: Option>, +} + +impl ImageLayer { + /// Constructs a new ImageLayer struct with provided data and media type + pub fn new( + data: Vec, + media_type: String, + annotations: Option>, + ) -> Self { + ImageLayer { + data, + media_type, + annotations, + } + } + + /// Constructs a new ImageLayer struct with provided data and + /// media type application/vnd.oci.image.layer.v1.tar + pub fn oci_v1(data: Vec, annotations: Option>) -> Self { + Self::new(data, IMAGE_LAYER_MEDIA_TYPE.to_string(), annotations) + } + /// Constructs a new ImageLayer struct with provided data and + /// media type application/vnd.oci.image.layer.v1.tar+gzip + pub fn oci_v1_gzip(data: Vec, annotations: Option>) -> Self { + Self::new(data, IMAGE_LAYER_GZIP_MEDIA_TYPE.to_string(), annotations) + } + + /// Helper function to compute the sha256 digest of an image layer + pub fn sha256_digest(&self) -> String { + sha256_digest(&self.data) + } +} + +/// The data and media type for a configuration object +#[derive(Clone)] +pub struct Config { + /// The data of this config object + pub data: Vec, + /// The media type of this object + pub media_type: String, + /// This OPTIONAL property contains arbitrary metadata for this descriptor. + /// This OPTIONAL property MUST use the [annotation rules](https://github.com/opencontainers/image-spec/blob/main/annotations.md#rules) + pub annotations: Option>, +} + +impl Config { + /// Constructs a new Config struct with provided data and media type + pub fn new( + data: Vec, + media_type: String, + annotations: Option>, + ) -> Self { + Config { + data, + media_type, + annotations, + } + } + + /// Constructs a new Config struct with provided data and + /// media type application/vnd.oci.image.config.v1+json + pub fn oci_v1(data: Vec, annotations: Option>) -> Self { + Self::new(data, IMAGE_CONFIG_MEDIA_TYPE.to_string(), annotations) + } + + /// Construct a new Config struct with provided [`ConfigFile`] and + /// media type `application/vnd.oci.image.config.v1+json` + pub fn oci_v1_from_config_file( + config_file: ConfigFile, + annotations: Option>, + ) -> Result { + let data = serde_json::to_vec(&config_file)?; + Ok(Self::new( + data, + IMAGE_CONFIG_MEDIA_TYPE.to_string(), + annotations, + )) + } + + /// Helper function to compute the sha256 digest of this config object + pub fn sha256_digest(&self) -> String { + sha256_digest(&self.data) + } +} + +impl TryFrom for ConfigFile { + type Error = crate::errors::OciDistributionError; + + fn try_from(config: Config) -> Result { + let config = String::from_utf8(config.data) + .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?; + let config_file: ConfigFile = serde_json::from_str(&config) + .map_err(|e| OciDistributionError::ConfigConversionError(e.to_string()))?; + Ok(config_file) + } +}