From 98c7973df53b955241219da38ee2cbc5619f1756 Mon Sep 17 00:00:00 2001 From: Priyanshu Kumar Date: Fri, 19 Dec 2025 18:19:51 +0000 Subject: [PATCH 1/2] Add get_blob_stream (prefer GetRawBlob) Signed-off-by: Priyanshu Kumar --- Cargo.toml | 2 + src/imageproxy.rs | 327 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 325 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 237a2d4..b338c53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ rust-version = "1.70.0" [dependencies] futures-util = "0.3.13" +hex = "0.4.3" # NOTE when bumping this in a semver-incompatible way, because we re-export it you # must also bump the semver of this project. # See also https://github.com/youki-dev/oci-spec-rs/pull/288 @@ -18,6 +19,7 @@ rustix = { version = "1.0", features = ["process", "fs", "net"] } serde = { features = ["derive"], version = "1.0.125" } serde_json = "1.0.64" semver = "1.0.4" +sha2 = "0.10.9" thiserror = "2" tokio = { features = ["fs", "io-util", "macros", "process", "rt", "sync"], version = "1" } tracing = "0.1" diff --git a/src/imageproxy.rs b/src/imageproxy.rs index eb0a6ad..8645d81 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -10,6 +10,7 @@ use futures_util::{Future, FutureExt}; use itertools::Itertools; use oci_spec::image::{Descriptor, Digest}; use serde::{Deserialize, Serialize}; +use sha2::Digest as _; use std::fs::File; use std::iter::FusedIterator; use std::num::NonZeroU32; @@ -21,8 +22,8 @@ use std::pin::Pin; use std::process::{Command, Stdio}; use std::sync::{Arc, Mutex, OnceLock}; use thiserror::Error; -use tokio::io::{AsyncBufRead, AsyncReadExt}; -use tokio::sync::Mutex as AsyncMutex; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, ReadBuf}; +use tokio::sync::{oneshot, Mutex as AsyncMutex}; use tokio::task::JoinError; use tracing::instrument; @@ -45,6 +46,9 @@ pub enum Error { /// An error returned from the remote proxy #[error("proxy request returned error: {0}")] RequestReturned(Box), + /// An error returned via the `GetRawBlob` error pipe. + #[error(transparent)] + BlobError(#[from] GetBlobError), #[error("semantic version error: {0}")] SemanticVersion(#[from] semver::Error), #[error("proxy too old (requested={requested_version} found={found_version}) error")] @@ -88,6 +92,187 @@ impl From for Error { /// The error type returned from this crate. pub type Result = std::result::Result; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BlobStreamSource { + /// Used `GetRawBlob`. + GetRawBlob, + /// Fell back to `GetBlob`. + GetBlob, +} + +/// A streaming blob reader and "driver" future. +pub struct BlobStream<'a> { + pub source: BlobStreamSource, + pub expected_size: u64, + pub reported_size: Option, + pub reader: Box, + pub driver: futures_util::future::BoxFuture<'a, Result<()>>, +} + +impl std::fmt::Debug for BlobStream<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BlobStream") + .field("source", &self.source) + .field("expected_size", &self.expected_size) + .field("reported_size", &self.reported_size) + .finish_non_exhaustive() + } +} + +#[derive(Debug)] +enum VerifiedBlobReadResult { + Complete { nbytes: u64, digest_hex: String }, + Incomplete, +} + +#[derive(Debug)] +enum Hasher { + Sha256(sha2::Sha256), + Sha384(sha2::Sha384), + Sha512(sha2::Sha512), +} + +impl Hasher { + fn new_for_digest(digest: &Digest) -> Result { + use oci_spec::image::DigestAlgorithm; + Ok(match digest.algorithm() { + DigestAlgorithm::Sha256 => Self::Sha256(sha2::Sha256::new()), + DigestAlgorithm::Sha384 => Self::Sha384(sha2::Sha384::new()), + DigestAlgorithm::Sha512 => Self::Sha512(sha2::Sha512::new()), + DigestAlgorithm::Other(a) => { + return Err(Error::Other( + format!("Unsupported digest algorithm for blob verification: {a}").into(), + )); + } + _ => { + return Err(Error::Other( + format!( + "Unsupported digest algorithm for blob verification: {}", + digest.algorithm().as_ref() + ) + .into(), + )); + } + }) + } + + fn update(&mut self, chunk: &[u8]) { + match self { + Self::Sha256(h) => h.update(chunk), + Self::Sha384(h) => h.update(chunk), + Self::Sha512(h) => h.update(chunk), + } + } + + fn finalize_hex(self) -> String { + match self { + Self::Sha256(h) => hex::encode(h.finalize()), + Self::Sha384(h) => hex::encode(h.finalize()), + Self::Sha512(h) => hex::encode(h.finalize()), + } + } +} + +/// Wraps an [`AsyncRead`] and computes a digest; reports the result on EOF. +#[derive(Debug)] +struct VerifiedBlobReader { + inner: R, + nbytes: u64, + hasher: Option, + completion: Option>, +} + +impl VerifiedBlobReader { + fn new( + inner: R, + expected: Digest, + completion: oneshot::Sender, + ) -> Result { + let hasher = Hasher::new_for_digest(&expected)?; + Ok(Self { + inner, + nbytes: 0, + hasher: Some(hasher), + completion: Some(completion), + }) + } +} + +impl AsyncRead for VerifiedBlobReader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> std::task::Poll> { + if buf.remaining() == 0 { + return std::task::Poll::Ready(Ok(())); + } + let before = buf.filled().len(); + match Pin::new(&mut self.inner).poll_read(cx, buf) { + v @ std::task::Poll::Ready(Ok(_)) => { + let after = buf.filled().len(); + debug_assert!(after >= before); + let delta = after - before; + if delta > 0 { + let chunk = &buf.filled()[before..after]; + if let Some(hasher) = self.hasher.as_mut() { + hasher.update(chunk); + } + self.nbytes += delta as u64; + } else { + // EOF reached + let Some(tx) = self.completion.take() else { + return v; + }; + let Some(hasher) = self.hasher.take() else { + return v; + }; + let _ = tx.send(VerifiedBlobReadResult::Complete { + nbytes: self.nbytes, + digest_hex: hasher.finalize_hex(), + }); + } + v + } + o => o, + } + } +} + +impl Drop for VerifiedBlobReader { + fn drop(&mut self) { + if let Some(tx) = self.completion.take() { + let _ = tx.send(VerifiedBlobReadResult::Incomplete); + } + } +} + +fn verify_blob_bytes_read( + expected: &Digest, + expected_size: u64, + r: VerifiedBlobReadResult, +) -> Result<()> { + match r { + VerifiedBlobReadResult::Incomplete => Ok(()), + VerifiedBlobReadResult::Complete { nbytes, digest_hex } => { + if nbytes != expected_size { + return Err(Error::Other( + format!( + "Blob size mismatch for {expected}: expected {expected_size} bytes, read {nbytes} bytes" + ) + .into(), + )); + } + if digest_hex != expected.digest() { + return Err(Error::Other( + format!("Blob digest mismatch for {expected}: computed {digest_hex}").into(), + )); + } + Ok(()) + } + } +} + /// Re-export because we use this in our public APIs pub use oci_spec; @@ -116,6 +301,11 @@ fn layer_info_piped_proto_version() -> &'static semver::VersionReq { LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.7").unwrap()) } +fn raw_blob_proto_version() -> &'static semver::VersionReq { + static RAW_BLOB_PROTO_VERSION: OnceLock = OnceLock::new(); + RAW_BLOB_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.8").unwrap()) +} + #[derive(Serialize)] struct Request { method: String, @@ -483,8 +673,8 @@ impl ImageProxy { let supported = base_proto_version(); if !supported.matches(&protover) { return Err(Error::ProxyTooOld { - requested_version: protover.to_string().into(), - found_version: supported.to_string().into(), + requested_version: supported.to_string().into(), + found_version: protover.to_string().into(), }); } r.protover = protover; @@ -492,6 +682,14 @@ impl ImageProxy { Ok(r) } + pub fn protocol_version(&self) -> &semver::Version { + &self.protover + } + + pub fn supports_get_raw_blob(&self) -> bool { + raw_blob_proto_version().matches(&self.protover) + } + /// Create and send a request. Should only be used by impl_request. async fn impl_request_raw( sockfd: Arc>, @@ -739,6 +937,69 @@ impl ImageProxy { Ok((bloblen, fd, err)) } + /// Fetch a blob as a stream, preferring `GetRawBlob` and falling back to `GetBlob`. + /// + /// The returned `driver` future completes only after proxy-side processing finishes; it also + /// verifies `expected_size` and `digest` for the `GetRawBlob` path. + #[instrument] + pub async fn get_blob_stream<'a>( + &'a self, + img: &OpenedImage, + digest: &Digest, + expected_size: u64, + ) -> Result> { + let fallback_to_get_blob = || async move { + let (reader, driver) = self.get_blob(img, digest, expected_size).await?; + let driver = async move { driver.await }.boxed(); + Ok(BlobStream { + source: BlobStreamSource::GetBlob, + expected_size, + reported_size: Some(expected_size), + reader: Box::new(reader), + driver, + }) + }; + + if !self.supports_get_raw_blob() { + return fallback_to_get_blob().await; + } + + match self.get_raw_blob(img, digest).await { + Ok((reported_size, fd, err)) => { + if let Some(sz) = reported_size { + if sz != expected_size { + return Err(Error::Other( + format!( + "Blob size mismatch for {digest}: expected {expected_size} bytes, proxy reported {sz} bytes" + ) + .into(), + )); + } + } + + let expected = digest.clone(); + let (tx, rx) = oneshot::channel(); + let verified = VerifiedBlobReader::new(fd, expected.clone(), tx)?; + let driver = async move { + err.await.map_err(Error::from)?; + match rx.await { + Ok(r) => verify_blob_bytes_read(&expected, expected_size, r), + Err(_) => Ok(()), + } + } + .boxed(); + Ok(BlobStream { + source: BlobStreamSource::GetRawBlob, + expected_size, + reported_size, + reader: Box::new(verified), + driver, + }) + } + Err(e) => Err(e), + } + } + /// Fetch a descriptor. The requested size and digest are verified (by the proxy process). #[instrument] pub async fn get_descriptor( @@ -998,6 +1259,64 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_verified_blob_reader_ok() -> Result<()> { + use std::str::FromStr; + use tokio::io::AsyncReadExt; + + let data = b"hello world"; + let mut tmp = tempfile::NamedTempFile::new()?; + tmp.as_file_mut().write_all(data)?; + tmp.as_file_mut().sync_all()?; + + let digest = { + let mut h = sha2::Sha256::new(); + h.update(data); + Digest::from_str(&format!("sha256:{}", hex::encode(h.finalize()))).unwrap() + }; + + let fd = tokio::fs::File::open(tmp.path()).await?; + let (tx, rx) = oneshot::channel(); + let mut reader = VerifiedBlobReader::new(fd, digest.clone(), tx)?; + + let mut out = Vec::new(); + reader.read_to_end(&mut out).await?; + assert_eq!(&out, data); + + let result = rx.await.map_err(|e| Error::Other(e.to_string().into()))?; + verify_blob_bytes_read(&digest, data.len() as u64, result)?; + Ok(()) + } + + #[tokio::test] + async fn test_verified_blob_reader_digest_mismatch() -> Result<()> { + use std::str::FromStr; + use tokio::io::AsyncReadExt; + + let data = b"hello world"; + let mut tmp = tempfile::NamedTempFile::new()?; + tmp.as_file_mut().write_all(data)?; + tmp.as_file_mut().sync_all()?; + + let digest = { + let mut h = sha2::Sha256::new(); + h.update(b"not the content"); + Digest::from_str(&format!("sha256:{}", hex::encode(h.finalize()))).unwrap() + }; + + let fd = tokio::fs::File::open(tmp.path()).await?; + let (tx, rx) = oneshot::channel(); + let mut reader = VerifiedBlobReader::new(fd, digest.clone(), tx)?; + + let mut out = Vec::new(); + reader.read_to_end(&mut out).await?; + assert_eq!(&out, data); + + let result = rx.await.map_err(|e| Error::Other(e.to_string().into()))?; + assert!(verify_blob_bytes_read(&digest, data.len() as u64, result).is_err()); + Ok(()) + } + // Helper to create a dummy OwnedFd using memfd_create for testing. fn create_dummy_fd() -> OwnedFd { memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap() From a8886bce9c5ebdb8e322037d9cc459cba8237322 Mon Sep 17 00:00:00 2001 From: Priyanshu Kumar Date: Sat, 20 Dec 2025 05:23:34 +0000 Subject: [PATCH 2/2] ci: test get_blob_stream against old/new skopeo Run tests in containers with skopeo 1.18 and >=1.19 to exercise both GetBlob fallback and GetRawBlob paths. Signed-off-by: Priyanshu Kumar --- .github/workflows/ci.yaml | 48 +++++++++++++++ src/imageproxy.rs | 126 +++++++++++++++++++++++++++++++++++++- 2 files changed, 173 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index f273b23..43ac932 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -51,3 +51,51 @@ jobs: fetch-depth: 20 - name: Test ostree-rs-ext run: ./ci/test-ostree-rs-ext.sh + + test-old-skopeo: + name: Test (old skopeo) + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v6 + with: + ref: ${{ github.event.pull_request.head.sha }} + fetch-depth: 20 + - name: Test in AlmaLinux bootc container (skopeo 1.18) + run: | + set -euxo pipefail + rm -rf .ci-cargo-home + mkdir -p .ci-cargo-home + docker run --rm \ + -e CARGO_TARGET_DIR=/tmp/target \ + -e EXPECT_BLOB_STREAM_SOURCE=GetBlob \ + -v "$PWD:/src:ro" \ + -v "$PWD/.ci-cargo-home:/root/.cargo" \ + -w /src \ + quay.io/almalinuxorg/almalinux-bootc:10.0 \ + sh -lc 'dnf -y install cargo rustc >/dev/null && skopeo --version && cargo test --all-features -- --nocapture --quiet' + sudo rm -rf .ci-cargo-home + + test-new-skopeo: + name: Test (new skopeo) + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v6 + with: + ref: ${{ github.event.pull_request.head.sha }} + fetch-depth: 20 + - name: Test in Fedora container (skopeo >= 1.19) + run: | + set -euxo pipefail + rm -rf .ci-cargo-home + mkdir -p .ci-cargo-home + docker run --rm \ + -e CARGO_TARGET_DIR=/tmp/target \ + -e EXPECT_BLOB_STREAM_SOURCE=GetRawBlob \ + -v "$PWD:/src:ro" \ + -v "$PWD/.ci-cargo-home:/root/.cargo" \ + -w /src \ + quay.io/fedora/fedora:41 \ + sh -lc 'dnf -y install cargo rustc skopeo >/dev/null && skopeo --version && cargo test --all-features -- --nocapture --quiet' + sudo rm -rf .ci-cargo-home diff --git a/src/imageproxy.rs b/src/imageproxy.rs index 8645d81..f73a43e 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -950,7 +950,7 @@ impl ImageProxy { ) -> Result> { let fallback_to_get_blob = || async move { let (reader, driver) = self.get_blob(img, digest, expected_size).await?; - let driver = async move { driver.await }.boxed(); + let driver = driver.boxed(); Ok(BlobStream { source: BlobStreamSource::GetBlob, expected_size, @@ -1317,6 +1317,130 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_get_blob_stream_oci_dir() -> Result<()> { + use std::str::FromStr; + + if !check_skopeo() { + return Ok(()); + } + + fn sha256_digest(bytes: &[u8]) -> Digest { + let mut h = sha2::Sha256::new(); + h.update(bytes); + Digest::from_str(&format!("sha256:{}", hex::encode(h.finalize()))).unwrap() + } + + fn write_blob(root: &std::path::Path, bytes: &[u8]) -> Result<(Digest, u64)> { + let digest = sha256_digest(bytes); + let size = bytes.len() as u64; + let dir = root.join("blobs").join("sha256"); + std::fs::create_dir_all(&dir)?; + std::fs::write(dir.join(digest.digest()), bytes)?; + Ok((digest, size)) + } + + let td = tempfile::tempdir()?; + std::fs::write( + td.path().join("oci-layout"), + serde_json::to_vec(&serde_json::json!({"imageLayoutVersion":"1.0.0"}))?, + )?; + + let layer_bytes = b"layer bytes"; + let (layer_digest, layer_size) = write_blob(td.path(), layer_bytes)?; + + let config_bytes = serde_json::to_vec(&serde_json::json!({ + "architecture": "amd64", + "os": "linux", + "rootfs": { + "type": "layers", + "diff_ids": [layer_digest.to_string()], + }, + "config": {}, + }))?; + let (config_digest, config_size) = write_blob(td.path(), &config_bytes)?; + + let manifest_bytes = serde_json::to_vec(&serde_json::json!({ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.manifest.v1+json", + "config": { + "mediaType": "application/vnd.oci.image.config.v1+json", + "digest": config_digest.to_string(), + "size": config_size, + }, + "layers": [{ + "mediaType": "application/vnd.oci.image.layer.v1.tar", + "digest": layer_digest.to_string(), + "size": layer_size, + }], + }))?; + let (manifest_digest, manifest_size) = write_blob(td.path(), &manifest_bytes)?; + + std::fs::write( + td.path().join("index.json"), + serde_json::to_vec(&serde_json::json!({ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.index.v1+json", + "manifests": [{ + "mediaType": "application/vnd.oci.image.manifest.v1+json", + "digest": manifest_digest.to_string(), + "size": manifest_size, + "annotations": { + "org.opencontainers.image.ref.name": "test", + } + }] + }))?, + )?; + + let proxy = ImageProxy::new().await?; + let imgref = format!("oci:{}:test", td.path().display()); + let img = proxy.open_image(&imgref).await?; + + let expected_source = match std::env::var("EXPECT_BLOB_STREAM_SOURCE").ok().as_deref() { + Some("GetRawBlob") => BlobStreamSource::GetRawBlob, + Some("GetBlob") => BlobStreamSource::GetBlob, + Some(v) => { + return Err(Error::Other( + format!( + "Invalid EXPECT_BLOB_STREAM_SOURCE={v}; expected GetRawBlob or GetBlob" + ) + .into(), + )); + } + None => { + if proxy.supports_get_raw_blob() { + BlobStreamSource::GetRawBlob + } else { + BlobStreamSource::GetBlob + } + } + }; + + let BlobStream { + source, + reader, + driver, + .. + } = proxy + .get_blob_stream(&img, &layer_digest, layer_size) + .await?; + assert_eq!(source, expected_source); + + let mut reader = reader; + let mut sink = tokio::io::sink(); + let read = async move { + let n = tokio::io::copy(&mut *reader, &mut sink).await?; + Result::Ok(n) + }; + let (n, driver) = tokio::join!(read, driver); + assert_eq!(n?, layer_size); + driver?; + + proxy.close_image(&img).await?; + proxy.finalize().await?; + Ok(()) + } + // Helper to create a dummy OwnedFd using memfd_create for testing. fn create_dummy_fd() -> OwnedFd { memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap()