Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "evdev"
version = "0.13.2"
version = "0.13.3"
authors = ["Corey Richardson <corey@octayn.net>"]
description = "evdev interface for Linux"
license = "Apache-2.0 OR MIT"
Expand All @@ -11,6 +11,7 @@ rust-version = "1.64"

[features]
serde = ["dep:serde"]
async-io = ["dep:async-io", "dep:async-fs", "dep:futures-lite"]
tokio = ["dep:tokio"]
stream-trait = ["tokio", "futures-core"]
device-test = []
Expand All @@ -24,15 +25,23 @@ nix = { version = "0.29", features = ["ioctl", "fs", "event"] }
serde = { version = "1.0", features = ["derive"], optional = true }
tokio = { version = "1.17", features = ["fs","time", "net"], optional = true }
futures-core = { version = "0.3", optional = true }
async-io = { version = "2.6.0", optional = true }
async-fs = { version = "2.2.0", optional = true }
futures-lite = { version = "2.6.1", optional = true }

[dev-dependencies]
tokio = { version = "1.17", features = ["macros", "rt-multi-thread", "time"] }
itertools = "0.10"
async-executor = "1.13.3"

[[example]]
name = "evtest_tokio"
required-features = ["tokio"]

[[example]]
name = "evtest_async-io"
required-features = ["async-io"]

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
17 changes: 17 additions & 0 deletions examples/evtest_async-io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//! Demonstrating how to monitor events with evdev + async-io

// cli/"tui" shared between the evtest examples
mod _pick_device;

fn main() {
let d = _pick_device::pick_device();
println!("{}", d);
println!("Events:");
let mut events = d.into_event_stream().unwrap();
futures_lite::future::block_on(async {
loop {
let ev = events.next_event().await.unwrap();
println!("{:?}", ev);
}
});
}
7 changes: 5 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@
//! async runtime with the fd returned by `<Device as AsRawFd>::as_raw_fd` to process events when
//! they are ready.
//!
//! For demonstrations of how to use this library in blocking, nonblocking, and async (tokio) modes,
//! please reference the "examples" directory.
//! For demonstrations of how to use this library in blocking, nonblocking, and async
//! (tokio / async-io) modes, please reference the "examples" directory.

// should really be cfg(target_os = "linux") and maybe also android?
#![cfg(unix)]
Expand All @@ -161,6 +161,9 @@
// (see https://github.com/rust-lang/rust/pull/100883#issuecomment-1264470491)
#![cfg_attr(docsrs, feature(doc_auto_cfg))]

#[cfg(all(feature = "tokio", feature = "async-io"))]
compile_error!("Features 'tokio' and 'async-io' are mutually exclusive");

// has to be first for its macro
#[macro_use]
mod attribute_set;
Expand Down
41 changes: 31 additions & 10 deletions src/raw_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ impl RawDevice {
Ok(keycode as u32)
}

#[cfg(feature = "tokio")]
#[cfg(any(feature = "tokio", feature = "async-io"))]
#[inline]
pub fn into_event_stream(self) -> io::Result<EventStream> {
EventStream::new(self)
Expand Down Expand Up @@ -762,12 +762,15 @@ impl Iterator for EnumerateDevices {
}
}

#[cfg(feature = "tokio")]
mod tokio_stream {
#[cfg(any(feature = "tokio", feature = "async-io"))]
mod async_stream {
use super::*;

#[cfg(feature = "async-io")]
use async_io::Async as AsyncFd;
use std::future::poll_fn;
use std::task::{ready, Context, Poll};
#[cfg(feature = "tokio")]
use tokio::io::unix::AsyncFd;

/// An asynchronous stream of input events.
Expand Down Expand Up @@ -796,10 +799,19 @@ mod tokio_stream {
}

/// Returns a mutable reference to the underlying device.
#[cfg(feature = "tokio")]
pub fn device_mut(&mut self) -> &mut RawDevice {
self.device.get_mut()
}

/// Returns a mutable reference to the underlying device.
/// # Safety
/// Must not drop the mutable reference with mem::swap() or mem::take().
#[cfg(feature = "async-io")]
pub unsafe fn device_mut_nodrop(&mut self) -> &mut RawDevice {
unsafe { self.device.get_mut() }
}

/// Try to wait for the next event in this stream. Any errors are likely to be fatal, i.e.
/// any calls afterwards will likely error as well.
pub async fn next_event(&mut self) -> io::Result<InputEvent> {
Expand All @@ -813,14 +825,23 @@ mod tokio_stream {
self.index += 1;
return Poll::Ready(Ok(InputEvent::from(ev)));
}

self.device.get_mut().event_buf.clear();
#[allow(unused_unsafe)] // async-io requires unsafe, tokio does not
unsafe { self.device.get_mut().event_buf.clear() };
self.index = 0;

loop {
let mut guard = ready!(self.device.poll_read_ready_mut(cx))?;

let res = guard.try_io(|device| device.get_mut().fill_events());
let res = {
#[cfg(feature = "tokio")]
{
let mut guard = ready!(self.device.poll_read_ready_mut(cx))?;
guard.try_io(|device| device.get_mut().fill_events())
}
#[cfg(feature = "async-io")]
{
ready!(self.device.poll_readable(cx))?;
unsafe { io::Result::Ok(self.device.get_mut().fill_events()) }
}
};
match res {
Ok(res) => {
let _ = res?;
Expand All @@ -844,5 +865,5 @@ mod tokio_stream {
}
}
}
#[cfg(feature = "tokio")]
pub use tokio_stream::EventStream;
#[cfg(any(feature = "tokio", feature = "async-io"))]
pub use async_stream::EventStream;
40 changes: 31 additions & 9 deletions src/sync_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl Device {
})
}

#[cfg(feature = "tokio")]
#[cfg(any(feature = "tokio", feature = "async-io"))]
pub fn into_event_stream(self) -> io::Result<EventStream> {
EventStream::new(self)
}
Expand Down Expand Up @@ -779,12 +779,15 @@ impl fmt::Display for Device {
}
}

#[cfg(feature = "tokio")]
mod tokio_stream {
#[cfg(any(feature = "tokio", feature = "async-io"))]
mod async_stream {
use super::*;

#[cfg(feature = "async-io")]
use async_io::Async as AsyncFd;
use std::future::poll_fn;
use std::task::{ready, Context, Poll};
#[cfg(feature = "tokio")]
use tokio::io::unix::AsyncFd;

/// An asynchronous stream of input events.
Expand Down Expand Up @@ -819,10 +822,19 @@ mod tokio_stream {
}

/// Returns a mutable reference to the underlying device
#[cfg(feature = "tokio")]
pub fn device_mut(&mut self) -> &mut Device {
self.device.get_mut()
}

/// Returns a mutable reference to the underlying device.
/// # Safety
/// Must not drop the mutable reference with mem::swap() or mem::take().
#[cfg(feature = "async-io")]
pub unsafe fn device_mut_nodrop(&mut self) -> &mut Device {
unsafe { self.device.get_mut() }
}

/// Try to wait for the next event in this stream. Any errors are likely to be fatal, i.e.
/// any calls afterwards will likely error as well.
pub async fn next_event(&mut self) -> io::Result<InputEvent> {
Expand All @@ -832,7 +844,8 @@ mod tokio_stream {
/// A lower-level function for directly polling this stream.
pub fn poll_event(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<InputEvent>> {
'outer: loop {
let dev = self.device.get_mut();
#[allow(unused_unsafe)] // async-io requires unsafe, tokio does not
let dev = unsafe { self.device.get_mut() };
if let Some(ev) = compensate_events(&mut self.sync, dev) {
return Poll::Ready(Ok(ev));
}
Expand All @@ -856,9 +869,18 @@ mod tokio_stream {
self.consumed_to = 0;

loop {
let mut guard = ready!(self.device.poll_read_ready_mut(cx))?;

let res = guard.try_io(|device| device.get_mut().fetch_events_inner());
let res = {
#[cfg(feature = "tokio")]
{
let mut guard = ready!(self.device.poll_read_ready_mut(cx))?;
guard.try_io(|device| device.get_mut().fetch_events_inner())
}
#[cfg(feature = "async-io")]
{
ready!(self.device.poll_readable(cx))?;
unsafe { io::Result::Ok(self.device.get_mut().fetch_events_inner()) }
}
};
match res {
Ok(res) => {
self.sync = res?;
Expand All @@ -883,8 +905,8 @@ mod tokio_stream {
}
}
}
#[cfg(feature = "tokio")]
pub use tokio_stream::EventStream;
#[cfg(any(feature = "tokio", feature = "async-io"))]
pub use async_stream::EventStream;

#[cfg(test)]
mod tests {
Expand Down
67 changes: 54 additions & 13 deletions src/uinput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,13 @@ impl VirtualDevice {
}

/// Get the syspaths of the corresponding device nodes in /dev/input.
#[cfg(feature = "tokio")]
#[cfg(any(feature = "tokio", feature = "async-io"))]
pub async fn enumerate_dev_nodes(&mut self) -> io::Result<DevNodes> {
let path = self.get_syspath()?;
#[cfg(feature = "tokio")]
let dir = tokio::fs::read_dir(path).await?;
#[cfg(feature = "async-io")]
let dir = async_fs::read_dir(path).await?;

Ok(DevNodes { dir })
}
Expand Down Expand Up @@ -401,7 +404,7 @@ impl VirtualDevice {
Ok(self.event_buf.drain(..).map(InputEvent::from))
}

#[cfg(feature = "tokio")]
#[cfg(any(feature = "tokio", feature = "async-io"))]
#[inline]
pub fn into_event_stream(self) -> io::Result<VirtualEventStream> {
VirtualEventStream::new(self)
Expand Down Expand Up @@ -445,16 +448,31 @@ impl Iterator for DevNodesBlocking {
/// This struct is returned from the [VirtualDevice::enumerate_dev_nodes_blocking] function and
/// will yield the syspaths corresponding to the virtual device. These are of the form
/// `/dev/input123`.
#[cfg(feature = "tokio")]
#[cfg(any(feature = "tokio", feature = "async-io"))]
pub struct DevNodes {
#[cfg(feature = "tokio")]
dir: tokio::fs::ReadDir,
#[cfg(feature = "async-io")]
dir: async_fs::ReadDir,
}

#[cfg(feature = "tokio")]
#[cfg(any(feature = "tokio", feature = "async-io"))]
impl DevNodes {
/// Returns the next entry in the set of device nodes.
pub async fn next_entry(&mut self) -> io::Result<Option<PathBuf>> {
while let Some(entry) = self.dir.next_entry().await? {
while let Some(entry) = {
#[cfg(feature = "tokio")]
{
self.dir.next_entry().await?
}
#[cfg(feature = "async-io")]
{
match futures_lite::StreamExt::next(&mut self.dir).await {
Some(v) => Some(v?),
None => None,
}
}
} {
// Map the directory name to its file name.
let file_name = entry.file_name();

Expand Down Expand Up @@ -562,12 +580,16 @@ impl Drop for FFEraseEvent {
}
}

#[cfg(feature = "tokio")]
mod tokio_stream {
#[cfg(any(feature = "tokio", feature = "async-io"))]
mod async_stream {
use super::*;

use std::future::poll_fn;
use std::task::{ready, Context, Poll};

#[cfg(feature = "async-io")]
use async_io::Async as AsyncFd;
#[cfg(feature = "tokio")]
use tokio::io::unix::AsyncFd;

/// An asynchronous stream of input events.
Expand Down Expand Up @@ -596,10 +618,19 @@ mod tokio_stream {
}

/// Returns a mutable reference to the underlying device.
#[cfg(feature = "tokio")]
pub fn device_mut(&mut self) -> &mut VirtualDevice {
self.device.get_mut()
}

/// Returns a mutable reference to the underlying device.
/// # Safety
/// Must not drop the mutable reference with mem::swap() or mem::take().
#[cfg(feature = "async-io")]
pub unsafe fn device_mut_nodrop(&mut self) -> &mut VirtualDevice {
unsafe { self.device.get_mut() }
}

/// Try to wait for the next event in this stream. Any errors are likely to be fatal, i.e.
/// any calls afterwards will likely error as well.
pub async fn next_event(&mut self) -> io::Result<InputEvent> {
Expand All @@ -613,14 +644,24 @@ mod tokio_stream {
self.index += 1;
return Poll::Ready(Ok(InputEvent::from(ev)));
}

self.device.get_mut().event_buf.clear();
#[allow(unused_unsafe)] // async-io requires unsafe, tokio does not
unsafe { self.device.get_mut().event_buf.clear() };
self.index = 0;

loop {
let mut guard = ready!(self.device.poll_read_ready_mut(cx))?;
let res = {
#[cfg(feature = "tokio")]
{
let mut guard = ready!(self.device.poll_read_ready_mut(cx))?;
guard.try_io(|device| device.get_mut().fill_events())
}
#[cfg(feature = "async-io")]
{
ready!(self.device.poll_readable(cx))?;
unsafe { io::Result::Ok(self.device.get_mut().fill_events()) }
}
};

let res = guard.try_io(|device| device.get_mut().fill_events());
match res {
Ok(res) => {
let _ = res?;
Expand All @@ -644,5 +685,5 @@ mod tokio_stream {
}
}
}
#[cfg(feature = "tokio")]
pub use tokio_stream::VirtualEventStream;
#[cfg(any(feature = "tokio", feature = "async-io"))]
pub use async_stream::VirtualEventStream;
Loading