diff --git a/Cargo.toml b/Cargo.toml index 43b36f1..002abe9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "evdev" -version = "0.13.2" +version = "0.13.3" authors = ["Corey Richardson "] description = "evdev interface for Linux" license = "Apache-2.0 OR MIT" @@ -11,6 +11,7 @@ rust-version = "1.64" [features] serde = ["dep:serde"] +async-io = ["dep:async-io", "dep:async-fs", "dep:futures-lite"] tokio = ["dep:tokio"] stream-trait = ["tokio", "futures-core"] device-test = [] @@ -24,15 +25,23 @@ nix = { version = "0.29", features = ["ioctl", "fs", "event"] } serde = { version = "1.0", features = ["derive"], optional = true } tokio = { version = "1.17", features = ["fs","time", "net"], optional = true } futures-core = { version = "0.3", optional = true } +async-io = { version = "2.6.0", optional = true } +async-fs = { version = "2.2.0", optional = true } +futures-lite = { version = "2.6.1", optional = true } [dev-dependencies] tokio = { version = "1.17", features = ["macros", "rt-multi-thread", "time"] } itertools = "0.10" +async-executor = "1.13.3" [[example]] name = "evtest_tokio" required-features = ["tokio"] +[[example]] +name = "evtest_async-io" +required-features = ["async-io"] + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] diff --git a/examples/evtest_async-io.rs b/examples/evtest_async-io.rs new file mode 100644 index 0000000..dddc2e6 --- /dev/null +++ b/examples/evtest_async-io.rs @@ -0,0 +1,17 @@ +//! Demonstrating how to monitor events with evdev + async-io + +// cli/"tui" shared between the evtest examples +mod _pick_device; + +fn main() { + let d = _pick_device::pick_device(); + println!("{}", d); + println!("Events:"); + let mut events = d.into_event_stream().unwrap(); + futures_lite::future::block_on(async { + loop { + let ev = events.next_event().await.unwrap(); + println!("{:?}", ev); + } + }); +} diff --git a/src/lib.rs b/src/lib.rs index 89529ae..b73380f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -147,8 +147,8 @@ //! async runtime with the fd returned by `::as_raw_fd` to process events when //! they are ready. //! -//! For demonstrations of how to use this library in blocking, nonblocking, and async (tokio) modes, -//! please reference the "examples" directory. +//! For demonstrations of how to use this library in blocking, nonblocking, and async +//! (tokio / async-io) modes, please reference the "examples" directory. // should really be cfg(target_os = "linux") and maybe also android? #![cfg(unix)] @@ -161,6 +161,9 @@ // (see https://github.com/rust-lang/rust/pull/100883#issuecomment-1264470491) #![cfg_attr(docsrs, feature(doc_auto_cfg))] +#[cfg(all(feature = "tokio", feature = "async-io"))] +compile_error!("Features 'tokio' and 'async-io' are mutually exclusive"); + // has to be first for its macro #[macro_use] mod attribute_set; diff --git a/src/raw_stream.rs b/src/raw_stream.rs index 8714bda..64f9044 100644 --- a/src/raw_stream.rs +++ b/src/raw_stream.rs @@ -636,7 +636,7 @@ impl RawDevice { Ok(keycode as u32) } - #[cfg(feature = "tokio")] + #[cfg(any(feature = "tokio", feature = "async-io"))] #[inline] pub fn into_event_stream(self) -> io::Result { EventStream::new(self) @@ -762,12 +762,15 @@ impl Iterator for EnumerateDevices { } } -#[cfg(feature = "tokio")] -mod tokio_stream { +#[cfg(any(feature = "tokio", feature = "async-io"))] +mod async_stream { use super::*; + #[cfg(feature = "async-io")] + use async_io::Async as AsyncFd; use std::future::poll_fn; use std::task::{ready, Context, Poll}; + #[cfg(feature = "tokio")] use tokio::io::unix::AsyncFd; /// An asynchronous stream of input events. @@ -796,10 +799,19 @@ mod tokio_stream { } /// Returns a mutable reference to the underlying device. + #[cfg(feature = "tokio")] pub fn device_mut(&mut self) -> &mut RawDevice { self.device.get_mut() } + /// Returns a mutable reference to the underlying device. + /// # Safety + /// Must not drop the mutable reference with mem::swap() or mem::take(). + #[cfg(feature = "async-io")] + pub unsafe fn device_mut_nodrop(&mut self) -> &mut RawDevice { + unsafe { self.device.get_mut() } + } + /// Try to wait for the next event in this stream. Any errors are likely to be fatal, i.e. /// any calls afterwards will likely error as well. pub async fn next_event(&mut self) -> io::Result { @@ -813,14 +825,23 @@ mod tokio_stream { self.index += 1; return Poll::Ready(Ok(InputEvent::from(ev))); } - - self.device.get_mut().event_buf.clear(); + #[allow(unused_unsafe)] // async-io requires unsafe, tokio does not + unsafe { self.device.get_mut().event_buf.clear() }; self.index = 0; loop { - let mut guard = ready!(self.device.poll_read_ready_mut(cx))?; - - let res = guard.try_io(|device| device.get_mut().fill_events()); + let res = { + #[cfg(feature = "tokio")] + { + let mut guard = ready!(self.device.poll_read_ready_mut(cx))?; + guard.try_io(|device| device.get_mut().fill_events()) + } + #[cfg(feature = "async-io")] + { + ready!(self.device.poll_readable(cx))?; + unsafe { io::Result::Ok(self.device.get_mut().fill_events()) } + } + }; match res { Ok(res) => { let _ = res?; @@ -844,5 +865,5 @@ mod tokio_stream { } } } -#[cfg(feature = "tokio")] -pub use tokio_stream::EventStream; +#[cfg(any(feature = "tokio", feature = "async-io"))] +pub use async_stream::EventStream; diff --git a/src/sync_stream.rs b/src/sync_stream.rs index c3605f4..9f36ab8 100644 --- a/src/sync_stream.rs +++ b/src/sync_stream.rs @@ -357,7 +357,7 @@ impl Device { }) } - #[cfg(feature = "tokio")] + #[cfg(any(feature = "tokio", feature = "async-io"))] pub fn into_event_stream(self) -> io::Result { EventStream::new(self) } @@ -779,12 +779,15 @@ impl fmt::Display for Device { } } -#[cfg(feature = "tokio")] -mod tokio_stream { +#[cfg(any(feature = "tokio", feature = "async-io"))] +mod async_stream { use super::*; + #[cfg(feature = "async-io")] + use async_io::Async as AsyncFd; use std::future::poll_fn; use std::task::{ready, Context, Poll}; + #[cfg(feature = "tokio")] use tokio::io::unix::AsyncFd; /// An asynchronous stream of input events. @@ -819,10 +822,19 @@ mod tokio_stream { } /// Returns a mutable reference to the underlying device + #[cfg(feature = "tokio")] pub fn device_mut(&mut self) -> &mut Device { self.device.get_mut() } + /// Returns a mutable reference to the underlying device. + /// # Safety + /// Must not drop the mutable reference with mem::swap() or mem::take(). + #[cfg(feature = "async-io")] + pub unsafe fn device_mut_nodrop(&mut self) -> &mut Device { + unsafe { self.device.get_mut() } + } + /// Try to wait for the next event in this stream. Any errors are likely to be fatal, i.e. /// any calls afterwards will likely error as well. pub async fn next_event(&mut self) -> io::Result { @@ -832,7 +844,8 @@ mod tokio_stream { /// A lower-level function for directly polling this stream. pub fn poll_event(&mut self, cx: &mut Context<'_>) -> Poll> { 'outer: loop { - let dev = self.device.get_mut(); + #[allow(unused_unsafe)] // async-io requires unsafe, tokio does not + let dev = unsafe { self.device.get_mut() }; if let Some(ev) = compensate_events(&mut self.sync, dev) { return Poll::Ready(Ok(ev)); } @@ -856,9 +869,18 @@ mod tokio_stream { self.consumed_to = 0; loop { - let mut guard = ready!(self.device.poll_read_ready_mut(cx))?; - - let res = guard.try_io(|device| device.get_mut().fetch_events_inner()); + let res = { + #[cfg(feature = "tokio")] + { + let mut guard = ready!(self.device.poll_read_ready_mut(cx))?; + guard.try_io(|device| device.get_mut().fetch_events_inner()) + } + #[cfg(feature = "async-io")] + { + ready!(self.device.poll_readable(cx))?; + unsafe { io::Result::Ok(self.device.get_mut().fetch_events_inner()) } + } + }; match res { Ok(res) => { self.sync = res?; @@ -883,8 +905,8 @@ mod tokio_stream { } } } -#[cfg(feature = "tokio")] -pub use tokio_stream::EventStream; +#[cfg(any(feature = "tokio", feature = "async-io"))] +pub use async_stream::EventStream; #[cfg(test)] mod tests { diff --git a/src/uinput.rs b/src/uinput.rs index d79e3c7..f73cb56 100644 --- a/src/uinput.rs +++ b/src/uinput.rs @@ -300,10 +300,13 @@ impl VirtualDevice { } /// Get the syspaths of the corresponding device nodes in /dev/input. - #[cfg(feature = "tokio")] + #[cfg(any(feature = "tokio", feature = "async-io"))] pub async fn enumerate_dev_nodes(&mut self) -> io::Result { let path = self.get_syspath()?; + #[cfg(feature = "tokio")] let dir = tokio::fs::read_dir(path).await?; + #[cfg(feature = "async-io")] + let dir = async_fs::read_dir(path).await?; Ok(DevNodes { dir }) } @@ -401,7 +404,7 @@ impl VirtualDevice { Ok(self.event_buf.drain(..).map(InputEvent::from)) } - #[cfg(feature = "tokio")] + #[cfg(any(feature = "tokio", feature = "async-io"))] #[inline] pub fn into_event_stream(self) -> io::Result { VirtualEventStream::new(self) @@ -445,16 +448,31 @@ impl Iterator for DevNodesBlocking { /// This struct is returned from the [VirtualDevice::enumerate_dev_nodes_blocking] function and /// will yield the syspaths corresponding to the virtual device. These are of the form /// `/dev/input123`. -#[cfg(feature = "tokio")] +#[cfg(any(feature = "tokio", feature = "async-io"))] pub struct DevNodes { + #[cfg(feature = "tokio")] dir: tokio::fs::ReadDir, + #[cfg(feature = "async-io")] + dir: async_fs::ReadDir, } -#[cfg(feature = "tokio")] +#[cfg(any(feature = "tokio", feature = "async-io"))] impl DevNodes { /// Returns the next entry in the set of device nodes. pub async fn next_entry(&mut self) -> io::Result> { - while let Some(entry) = self.dir.next_entry().await? { + while let Some(entry) = { + #[cfg(feature = "tokio")] + { + self.dir.next_entry().await? + } + #[cfg(feature = "async-io")] + { + match futures_lite::StreamExt::next(&mut self.dir).await { + Some(v) => Some(v?), + None => None, + } + } + } { // Map the directory name to its file name. let file_name = entry.file_name(); @@ -562,12 +580,16 @@ impl Drop for FFEraseEvent { } } -#[cfg(feature = "tokio")] -mod tokio_stream { +#[cfg(any(feature = "tokio", feature = "async-io"))] +mod async_stream { use super::*; use std::future::poll_fn; use std::task::{ready, Context, Poll}; + + #[cfg(feature = "async-io")] + use async_io::Async as AsyncFd; + #[cfg(feature = "tokio")] use tokio::io::unix::AsyncFd; /// An asynchronous stream of input events. @@ -596,10 +618,19 @@ mod tokio_stream { } /// Returns a mutable reference to the underlying device. + #[cfg(feature = "tokio")] pub fn device_mut(&mut self) -> &mut VirtualDevice { self.device.get_mut() } + /// Returns a mutable reference to the underlying device. + /// # Safety + /// Must not drop the mutable reference with mem::swap() or mem::take(). + #[cfg(feature = "async-io")] + pub unsafe fn device_mut_nodrop(&mut self) -> &mut VirtualDevice { + unsafe { self.device.get_mut() } + } + /// Try to wait for the next event in this stream. Any errors are likely to be fatal, i.e. /// any calls afterwards will likely error as well. pub async fn next_event(&mut self) -> io::Result { @@ -613,14 +644,24 @@ mod tokio_stream { self.index += 1; return Poll::Ready(Ok(InputEvent::from(ev))); } - - self.device.get_mut().event_buf.clear(); + #[allow(unused_unsafe)] // async-io requires unsafe, tokio does not + unsafe { self.device.get_mut().event_buf.clear() }; self.index = 0; loop { - let mut guard = ready!(self.device.poll_read_ready_mut(cx))?; + let res = { + #[cfg(feature = "tokio")] + { + let mut guard = ready!(self.device.poll_read_ready_mut(cx))?; + guard.try_io(|device| device.get_mut().fill_events()) + } + #[cfg(feature = "async-io")] + { + ready!(self.device.poll_readable(cx))?; + unsafe { io::Result::Ok(self.device.get_mut().fill_events()) } + } + }; - let res = guard.try_io(|device| device.get_mut().fill_events()); match res { Ok(res) => { let _ = res?; @@ -644,5 +685,5 @@ mod tokio_stream { } } } -#[cfg(feature = "tokio")] -pub use tokio_stream::VirtualEventStream; +#[cfg(any(feature = "tokio", feature = "async-io"))] +pub use async_stream::VirtualEventStream; diff --git a/tests/virtual_device.rs b/tests/virtual_device.rs index 2d8f918..38fb022 100644 --- a/tests/virtual_device.rs +++ b/tests/virtual_device.rs @@ -1,56 +1,87 @@ -#![cfg(feature = "tokio")] +#![cfg(any(feature = "tokio", feature = "async-io"))] use std::error::Error; use std::thread::sleep; use std::time::Duration; +#[cfg(feature = "tokio")] use tokio::time::timeout; +#[cfg(feature = "async-io")] +use futures_lite::FutureExt; use evdev::{uinput::VirtualDevice, AttributeSet, EventType, InputEvent, KeyCode}; -#[tokio::test] -async fn test_virtual_device_actually_emits() -> Result<(), Box> { - let mut keys = AttributeSet::::new(); - let virtual_device_name = "fake-keyboard"; - keys.insert(KeyCode::KEY_ESC); +#[test] +fn test_virtual_device_actually_emits() -> Result<(), Box> { + #[cfg(feature = "async-io")] + let ex = async_executor::Executor::new(); - let mut device = VirtualDevice::builder()? - .name(virtual_device_name) - .with_keys(&keys)? - .build() - .unwrap(); - - let mut maybe_device = None; - sleep(Duration::from_millis(500)); - for (_i, d) in evdev::enumerate() { - println!("{:?}", d.name()); - if d.name() == Some(virtual_device_name) { - maybe_device = Some(d); - break; + let fut = async { + let mut keys = AttributeSet::::new(); + let virtual_device_name = "fake-keyboard"; + keys.insert(KeyCode::KEY_ESC); + + let mut device = VirtualDevice::builder()? + .name(virtual_device_name) + .with_keys(&keys)? + .build() + .unwrap(); + + let mut maybe_device = None; + sleep(Duration::from_millis(500)); + for (_i, d) in evdev::enumerate() { + println!("{:?}", d.name()); + if d.name() == Some(virtual_device_name) { + maybe_device = Some(d); + break; + } } - } - assert!(maybe_device.is_some()); - let listen_device = maybe_device.unwrap(); + assert!(maybe_device.is_some()); + let listen_device = maybe_device.unwrap(); + + let type_ = EventType::KEY; + let code = KeyCode::KEY_ESC.code(); - let type_ = EventType::KEY; - let code = KeyCode::KEY_ESC.code(); + let fut = async move { + // try to read the key code that will be sent through virtual device + let mut events = listen_device.into_event_stream()?; + events.next_event().await + }; - // listen for events on the listen device - let listener = tokio::spawn(async move { - // try to read the key code that will be sent through virtual device - let mut events = listen_device.into_event_stream()?; - events.next_event().await - }); + // listen for events on the listen device + #[cfg(feature = "tokio")] + let listener = tokio::spawn(fut); + #[cfg(feature = "async-io")] + let listener = ex.spawn(fut); - // emit a key code through virtual device - let down_event = InputEvent::new(type_.0, code, 10); - device.emit(&[down_event]).unwrap(); + // emit a key code through virtual device + let down_event = InputEvent::new(type_.0, code, 10); + device.emit(&[down_event]).unwrap(); - let event = timeout(Duration::from_secs(1), listener).await???; + let time = Duration::from_secs(1); + #[cfg(feature = "tokio")] + let event = timeout(time, listener).await???; + #[cfg(feature = "async-io")] + let event = listener.or(async { + async_io::Timer::after(time).await; + Err(std::io::ErrorKind::TimedOut.into()) + }).await?; - assert_eq!(down_event.event_type(), event.event_type()); - assert_eq!(down_event.code(), event.code()); + assert_eq!(down_event.event_type(), event.event_type()); + assert_eq!(down_event.code(), event.code()); + + // wait for listener + Ok(()) + }; + + #[cfg(feature = "tokio")] + let res = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(fut); + #[cfg(feature = "async-io")] + let res = futures_lite::future::block_on(fut); - // wait for listener - Ok(()) + res }