From 40cd17501ed4be5f0cda9b418db2adfffc852058 Mon Sep 17 00:00:00 2001 From: Yuyi Wang Date: Mon, 22 Dec 2025 02:13:59 +0800 Subject: [PATCH 1/2] refactor(quic): redesign IO APIs --- compio-quic/Cargo.toml | 1 + compio-quic/benches/quic.rs | 5 +- compio-quic/examples/quic-client.rs | 4 +- compio-quic/examples/quic-dispatcher.rs | 8 +- compio-quic/examples/quic-server.rs | 4 +- compio-quic/src/recv_stream.rs | 140 +++++++++++------------- compio-quic/src/send_stream.rs | 34 +----- compio-quic/tests/basic.rs | 21 ++-- compio-quic/tests/echo.rs | 12 +- 9 files changed, 93 insertions(+), 136 deletions(-) diff --git a/compio-quic/Cargo.toml b/compio-quic/Cargo.toml index f5291c9d..519b3db2 100644 --- a/compio-quic/Cargo.toml +++ b/compio-quic/Cargo.toml @@ -86,3 +86,4 @@ required-features = ["compio-driver/fd-sync"] [[bench]] name = "quic" harness = false +required-features = ["io-compat"] diff --git a/compio-quic/benches/quic.rs b/compio-quic/benches/quic.rs index ed67af5e..62ee41d6 100644 --- a/compio-quic/benches/quic.rs +++ b/compio-quic/benches/quic.rs @@ -6,7 +6,7 @@ use std::{ use compio_buf::bytes::Bytes; use criterion::{BenchmarkId, Criterion, Throughput}; -use futures_util::{StreamExt, stream::FuturesUnordered}; +use futures_util::{AsyncWriteExt, StreamExt, stream::FuturesUnordered}; use rand::{RngCore, rng}; macro_rules! compio_spawn { @@ -125,8 +125,7 @@ async fn compio_quic_echo_client( send.finish().unwrap(); }, async { - let mut buf = vec![]; - recv.read_to_end(&mut buf).await.unwrap(); + recv.read_to_end(usize::MAX).await.unwrap(); } ); }) diff --git a/compio-quic/examples/quic-client.rs b/compio-quic/examples/quic-client.rs index f40b9d01..a785e23b 100644 --- a/compio-quic/examples/quic-client.rs +++ b/compio-quic/examples/quic-client.rs @@ -1,5 +1,6 @@ use std::net::{IpAddr, Ipv6Addr, SocketAddr}; +use compio_io::AsyncWrite; use compio_quic::ClientBuilder; use tracing_subscriber::EnvFilter; @@ -30,8 +31,7 @@ async fn main() { send.write(&[1, 2, 3]).await.unwrap(); send.finish().unwrap(); - let mut buf = vec![]; - recv.read_to_end(&mut buf).await.unwrap(); + let buf = recv.read_to_end(usize::MAX).await.unwrap(); println!("{buf:?}"); conn.close(1u32.into(), b"bye"); diff --git a/compio-quic/examples/quic-dispatcher.rs b/compio-quic/examples/quic-dispatcher.rs index 91f4150f..021b4d42 100644 --- a/compio-quic/examples/quic-dispatcher.rs +++ b/compio-quic/examples/quic-dispatcher.rs @@ -1,6 +1,7 @@ use std::num::NonZeroUsize; use compio_dispatcher::Dispatcher; +use compio_io::AsyncWriteExt; use compio_quic::{ClientBuilder, Endpoint, ServerBuilder}; use compio_runtime::spawn; use futures_util::{StreamExt, stream::FuturesUnordered}; @@ -40,9 +41,7 @@ async fn main() { .await .unwrap(); let mut send = conn.open_uni().unwrap(); - send.write_all(format!("Hello world {i}!").as_bytes()) - .await - .unwrap(); + send.write_all(format!("Hello world {i}!")).await.unwrap(); send.finish().unwrap(); send.stopped().await.unwrap(); } @@ -63,8 +62,7 @@ async fn main() { .dispatch(move || async move { let conn = incoming.await.unwrap(); let mut recv = conn.accept_uni().await.unwrap(); - let mut buf = vec![]; - recv.read_to_end(&mut buf).await.unwrap(); + let buf = recv.read_to_end(usize::MAX).await.unwrap(); println!("{}", std::str::from_utf8(&buf).unwrap()); }) .unwrap(); diff --git a/compio-quic/examples/quic-server.rs b/compio-quic/examples/quic-server.rs index 7a076a58..e50650bc 100644 --- a/compio-quic/examples/quic-server.rs +++ b/compio-quic/examples/quic-server.rs @@ -1,3 +1,4 @@ +use compio_io::AsyncWrite; use compio_quic::ServerBuilder; use tracing_subscriber::EnvFilter; @@ -24,8 +25,7 @@ async fn main() { let (mut send, mut recv) = conn.accept_bi().await.unwrap(); - let mut buf = vec![]; - recv.read_to_end(&mut buf).await.unwrap(); + let buf = recv.read_to_end(usize::MAX).await.unwrap(); println!("{buf:?}"); send.write(&[4, 5, 6]).await.unwrap(); diff --git a/compio-quic/src/recv_stream.rs b/compio-quic/src/recv_stream.rs index 333be852..e1913ffa 100644 --- a/compio-quic/src/recv_stream.rs +++ b/compio-quic/src/recv_stream.rs @@ -1,16 +1,13 @@ use std::{ - collections::BTreeMap, io, + mem::MaybeUninit, sync::Arc, task::{Context, Poll}, }; -use compio_buf::{ - BufResult, IoBufMut, - bytes::{BufMut, Bytes}, -}; +use compio_buf::{BufResult, IoBufMut, bytes::Bytes}; use compio_io::AsyncRead; -use futures_util::{future::poll_fn, ready}; +use futures_util::future::poll_fn; use quinn_proto::{Chunk, Chunks, ClosedStream, ReadableError, StreamId, VarInt}; use thiserror::Error; @@ -221,27 +218,44 @@ impl RecvStream { } } - fn poll_read( + /// Attempts to read from the stream into the provided buffer + /// + /// On success, returns `Poll::Ready(Ok(num_bytes_read))` and places data + /// into `buf`. If this returns zero bytes read (and `buf` has a + /// non-zero length), that indicates that the remote + /// side has [`finish`]ed the stream and the local side has already read all + /// bytes. + /// + /// If no data is available for reading, this returns `Poll::Pending` and + /// arranges for the current task (via `cx.waker()`) to be notified when + /// the stream becomes readable or is closed. + /// + /// [`finish`]: crate::SendStream::finish + pub fn poll_read_uninit( &mut self, cx: &mut Context, - mut buf: impl BufMut, - ) -> Poll, ReadError>> { - if !buf.has_remaining_mut() { - return Poll::Ready(Ok(Some(0))); + buf: &mut [MaybeUninit], + ) -> Poll> { + if buf.is_empty() { + return Poll::Ready(Ok(0)); } self.execute_poll_read(cx, true, |chunks| { let mut read = 0; loop { - if !buf.has_remaining_mut() { - // We know `read` is `true` because `buf.remaining()` was not 0 before + if read >= buf.len() { + // We know `read > 0` because `buf` cannot be empty here return ReadStatus::Readable(read); } - match chunks.next(buf.remaining_mut()) { + match chunks.next(buf.len() - read) { Ok(Some(chunk)) => { - read += chunk.bytes.len(); - buf.put(chunk.bytes); + let bytes = chunk.bytes; + let len = bytes.len(); + buf[read..read + len].copy_from_slice(unsafe { + std::slice::from_raw_parts(bytes.as_ptr().cast(), len) + }); + read += len; } res => { return (if read == 0 { None } else { Some(read) }, res.err()).into(); @@ -249,33 +263,7 @@ impl RecvStream { } } }) - } - - /// Read data contiguously from the stream. - /// - /// Yields the number of bytes read into `buf` on success, or `None` if the - /// stream was finished. - /// - /// This operation is cancel-safe. - pub async fn read(&mut self, mut buf: impl BufMut) -> Result, ReadError> { - poll_fn(|cx| self.poll_read(cx, &mut buf)).await - } - - /// Read an exact number of bytes contiguously from the stream. - /// - /// See [`read()`] for details. This operation is *not* cancel-safe. - /// - /// [`read()`]: RecvStream::read - pub async fn read_exact(&mut self, mut buf: impl BufMut) -> Result<(), ReadExactError> { - poll_fn(|cx| { - while buf.has_remaining_mut() { - if ready!(self.poll_read(cx, &mut buf))?.is_none() { - return Poll::Ready(Err(ReadExactError::FinishedEarly(buf.remaining_mut()))); - } - } - Poll::Ready(Ok(())) - }) - .await + .map(|res| res.map(|n| n.unwrap_or_default())) } /// Read the next segment of data. @@ -347,44 +335,44 @@ impl RecvStream { .await } - /// Convenience method to read all remaining data into a buffer. + /// Convenience method to read all remaining data into a buffer /// - /// Uses unordered reads to be more efficient than using [`AsyncRead`]. If - /// unordered reads have already been made, the resulting buffer may have - /// gaps containing zero. + /// Fails with [`ReadError::TooLong`] on reading more than `size_limit` + /// bytes, discarding all data read. Uses unordered reads to be more + /// efficient than using `AsyncRead` would allow. `size_limit` should be + /// set to limit worst-case memory use. /// - /// Depending on [`BufMut`] implementation, this method may fail with - /// [`ReadError::BufferTooShort`] if the buffer is not large enough to - /// hold the entire stream. For example when using a `&mut [u8]` it will - /// never receive bytes more than the length of the slice, but when using a - /// `&mut Vec` it will allocate more memory as needed. + /// If unordered reads have already been made, the resulting buffer may have + /// gaps containing arbitrary data. /// /// This operation is *not* cancel-safe. - pub async fn read_to_end(&mut self, mut buf: impl BufMut) -> Result { + pub async fn read_to_end(&mut self, size_limit: usize) -> Result, ReadError> { let mut start = u64::MAX; let mut end = 0; - let mut chunks = BTreeMap::new(); + let mut chunks = vec![]; loop { let Some(chunk) = self.read_chunk(usize::MAX, false).await? else { break; }; start = start.min(chunk.offset); end = end.max(chunk.offset + chunk.bytes.len() as u64); - if end - start > buf.remaining_mut() as u64 { - return Err(ReadError::BufferTooShort); + if (end - start) > size_limit as u64 { + return Err(ReadError::TooLong); } - chunks.insert(chunk.offset, chunk.bytes); + chunks.push((chunk.offset, chunk.bytes)); + } + if start == u64::MAX || start >= end { + // no data read + return Ok(vec![]); } - let mut last = 0; + let len = (end - start) as usize; + let mut buffer = vec![0u8; len]; for (offset, bytes) in chunks { let offset = (offset - start) as usize; - if offset > last { - buf.put_bytes(0, offset - last); - } - last = offset + bytes.len(); - buf.put(bytes); + let buf_len = bytes.len(); + buffer[offset..offset + buf_len].copy_from_slice(&bytes); } - Ok((end - start) as usize) + Ok(buffer) } } @@ -450,11 +438,11 @@ pub enum ReadError { /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt() #[error("0-RTT rejected")] ZeroRttRejected, - /// The stream is larger than the user-supplied buffer capacity. + /// The stream is larger than the user-supplied limit. /// /// Can only occur when using [`read_to_end()`](RecvStream::read_to_end). - #[error("buffer too short")] - BufferTooShort, + #[error("the stream is larger than the user-supplied limit")] + TooLong, } impl From for ReadError { @@ -481,7 +469,8 @@ impl From for io::Error { let kind = match x { Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset, ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected, - IllegalOrderedRead | BufferTooShort => io::ErrorKind::InvalidInput, + IllegalOrderedRead => io::ErrorKind::InvalidInput, + TooLong => io::ErrorKind::OutOfMemory, }; Self::new(kind, x) } @@ -500,14 +489,9 @@ pub enum ReadExactError { impl AsyncRead for RecvStream { async fn read(&mut self, mut buf: B) -> BufResult { - let res = self - .read(buf.as_uninit()) + let res = poll_fn(|cx| self.poll_read_uninit(cx, buf.as_uninit())) .await - .map(|n| { - let n = n.unwrap_or_default(); - unsafe { buf.advance_to(n) } - n - }) + .inspect(|&n| unsafe { buf.advance_to(n) }) .map_err(Into::into); BufResult(res, buf) } @@ -520,9 +504,11 @@ impl futures_util::AsyncRead for RecvStream { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { + // SAFETY: buf is valid self.get_mut() - .poll_read(cx, buf) - .map_ok(Option::unwrap_or_default) + .poll_read_uninit(cx, unsafe { + std::slice::from_raw_parts_mut(buf.as_mut_ptr().cast(), buf.len()) + }) .map_err(Into::into) } } diff --git a/compio-quic/src/send_stream.rs b/compio-quic/src/send_stream.rs index 88255b85..c2e0e669 100644 --- a/compio-quic/src/send_stream.rs +++ b/compio-quic/src/send_stream.rs @@ -179,35 +179,6 @@ impl SendStream { } } - /// Write bytes to the stream. - /// - /// Yields the number of bytes written on success. Congestion and flow - /// control may cause this to be shorter than `buf.len()`, indicating - /// that only a prefix of `buf` was written. - /// - /// This operation is cancel-safe. - pub async fn write(&mut self, buf: &[u8]) -> Result { - poll_fn(|cx| self.execute_poll_write(cx, |mut stream| stream.write(buf))).await - } - - /// Convenience method to write an entire buffer to the stream. - /// - /// This operation is *not* cancel-safe. - pub async fn write_all(&mut self, buf: &[u8]) -> Result<(), WriteError> { - let mut count = 0; - poll_fn(|cx| { - loop { - if count == buf.len() { - return Poll::Ready(Ok(())); - } - let n = - ready!(self.execute_poll_write(cx, |mut stream| stream.write(&buf[count..])))?; - count += n; - } - }) - .await - } - /// Write chunks to the stream. /// /// Yields the number of bytes and chunks written on success. @@ -329,7 +300,10 @@ impl From for io::Error { impl AsyncWrite for SendStream { async fn write(&mut self, buf: T) -> BufResult { - let res = self.write(buf.as_slice()).await.map_err(Into::into); + let res = + poll_fn(|cx| self.execute_poll_write(cx, |mut stream| stream.write(buf.as_slice()))) + .await + .map_err(Into::into); BufResult(res, buf) } diff --git a/compio-quic/tests/basic.rs b/compio-quic/tests/basic.rs index 4f0ed775..bdfdeb7b 100644 --- a/compio-quic/tests/basic.rs +++ b/compio-quic/tests/basic.rs @@ -4,6 +4,7 @@ use std::{ time::{Duration, Instant}, }; +use compio_io::{AsyncReadExt, AsyncWriteExt}; use compio_quic::{ClientBuilder, ConnectionError, Endpoint, TransportConfig}; use futures_util::join; @@ -102,8 +103,7 @@ async fn read_after_close() { .await .unwrap(); let mut recv = conn.accept_uni().await.unwrap(); - let mut buf = vec![]; - recv.read_to_end(&mut buf).await.unwrap(); + let buf = recv.read_to_end(usize::MAX).await.unwrap(); assert_eq!(buf, MSG); }, ); @@ -159,8 +159,7 @@ async fn zero_rtt() { join!( async { while let Ok(mut recv) = conn.accept_uni().await { - let mut buf = vec![]; - recv.read_to_end(&mut buf).await.unwrap(); + let buf = recv.read_to_end(usize::MAX).await.unwrap(); assert_eq!(buf, MSG0); } }, @@ -188,14 +187,15 @@ async fn zero_rtt() { .await .unwrap(); - let mut buf = vec![]; let mut recv = conn.accept_uni().await.unwrap(); - recv.read_to_end(&mut buf).await.expect("read_to_end"); + let mut buf = recv.read_to_end(usize::MAX).await.expect("read_to_end"); assert_eq!(buf, MSG0); buf.clear(); let mut recv = conn.accept_uni().await.unwrap(); - recv.read_to_end(&mut buf).await.expect("read_to_end"); + let (_, buf) = AsyncReadExt::read_to_end(&mut recv, buf) + .await + .expect("read_to_end"); assert_eq!(buf, MSG1); } @@ -209,16 +209,17 @@ async fn zero_rtt() { send.write_all(MSG0).await.unwrap(); send.finish().unwrap(); - let mut buf = vec![]; let mut recv = conn.accept_uni().await.unwrap(); - recv.read_to_end(&mut buf).await.expect("read_to_end"); + let mut buf = recv.read_to_end(usize::MAX).await.expect("read_to_end"); assert_eq!(buf, MSG0); assert!(conn.accepted_0rtt().await.unwrap()); buf.clear(); let mut recv = conn.accept_uni().await.unwrap(); - recv.read_to_end(&mut buf).await.expect("read_to_end"); + let (_, buf) = AsyncReadExt::read_to_end(&mut recv, buf) + .await + .expect("read_to_end"); assert_eq!(buf, MSG1); }, ); diff --git a/compio-quic/tests/echo.rs b/compio-quic/tests/echo.rs index 86d0c2ea..60406e08 100644 --- a/compio-quic/tests/echo.rs +++ b/compio-quic/tests/echo.rs @@ -4,6 +4,7 @@ use std::{ }; use compio_buf::bytes::Bytes; +use compio_io::AsyncWriteExt; use compio_quic::{Endpoint, RecvStream, SendStream, TransportConfig}; mod common; @@ -88,16 +89,13 @@ async fn run_echo(args: EchoArgs) { let (mut send, mut recv) = conn.open_bi_wait().await.unwrap(); let msg = gen_data(args.stream_size); - let (_, data) = join!( + let (msg, data) = join!( async { - send.write_all(&msg).await.unwrap(); + let (_, msg) = send.write_all(msg).await.unwrap(); send.finish().unwrap(); + msg }, - async { - let mut buf = vec![]; - recv.read_to_end(&mut buf).await.unwrap(); - buf - } + async { recv.read_to_end(usize::MAX).await.unwrap() } ); assert_eq!(data, msg); From 04a9cd85711d3ce7b63fdb8e16dc85bad9d99e8d Mon Sep 17 00:00:00 2001 From: Yuyi Wang Date: Mon, 22 Dec 2025 02:33:29 +0800 Subject: [PATCH 2/2] fix(quic): apply suggestions --- compio-quic/src/recv_stream.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/compio-quic/src/recv_stream.rs b/compio-quic/src/recv_stream.rs index e1913ffa..4a4f1127 100644 --- a/compio-quic/src/recv_stream.rs +++ b/compio-quic/src/recv_stream.rs @@ -335,7 +335,7 @@ impl RecvStream { .await } - /// Convenience method to read all remaining data into a buffer + /// Convenience method to read all remaining data into a buffer. /// /// Fails with [`ReadError::TooLong`] on reading more than `size_limit` /// bytes, discarding all data read. Uses unordered reads to be more @@ -343,7 +343,7 @@ impl RecvStream { /// set to limit worst-case memory use. /// /// If unordered reads have already been made, the resulting buffer may have - /// gaps containing arbitrary data. + /// gaps containing zeros. /// /// This operation is *not* cancel-safe. pub async fn read_to_end(&mut self, size_limit: usize) -> Result, ReadError> { @@ -470,7 +470,7 @@ impl From for io::Error { Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset, ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected, IllegalOrderedRead => io::ErrorKind::InvalidInput, - TooLong => io::ErrorKind::OutOfMemory, + TooLong => io::ErrorKind::InvalidData, }; Self::new(kind, x) }