diff --git a/Cargo.lock b/Cargo.lock index 1d193c68d..43cda6fc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,26 +36,26 @@ dependencies = [ [[package]] name = "actix-http" -version = "3.9.0" +version = "3.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d48f96fc3003717aeb9856ca3d02a8c7de502667ad76eeacd830b48d2e91fac4" +checksum = "7926860314cbe2fb5d1f13731e387ab43bd32bca224e82e6e2db85de0a3dba49" dependencies = [ "actix-codec", "actix-rt", "actix-service", "actix-tls", "actix-utils", - "ahash", "base64 0.22.1", "bitflags 2.8.0", - "brotli 6.0.0", + "brotli 8.0.2", "bytes", "bytestring", - "derive_more 0.99.18", + "derive_more 2.1.0", "encoding_rs", "flate2", + "foldhash", "futures-core", - "h2 0.3.26", + "h2 0.3.27", "http 0.2.12", "httparse", "httpdate", @@ -65,7 +65,7 @@ dependencies = [ "mime", "percent-encoding", "pin-project-lite", - "rand 0.8.5", + "rand 0.9.1", "sha1", "smallvec", "tokio", @@ -236,6 +236,54 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "actix-web-lab" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fde7e471db19a782577913d5fb56974fd247c5c841c56069632d4ea353f88e3" +dependencies = [ + "actix-http", + "actix-router", + "actix-service", + "actix-utils", + "actix-web", + "actix-web-lab-derive", + "ahash", + "arc-swap", + "bytes", + "bytestring", + "csv", + "derive_more 2.1.0", + "form_urlencoded", + "futures-core", + "futures-util", + "http 0.2.12", + "impl-more", + "itertools 0.14.0", + "local-channel", + "mime", + "pin-project-lite", + "regex", + "serde", + "serde_html_form", + "serde_json", + "serde_path_to_error", + "tokio", + "tokio-stream", + "tracing", + "url", +] + +[[package]] +name = "actix-web-lab-derive" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd80fa0bd6217e482112d9d87a05af8e0f8dec9e3aa51f34816f761c5cf7da7" +dependencies = [ + "quote", + "syn 2.0.108", +] + [[package]] name = "actix-web-prometheus" version = "0.1.2" @@ -399,6 +447,12 @@ dependencies = [ "derive_arbitrary", ] +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "argon2" version = "0.5.3" @@ -908,17 +962,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "brotli" -version = "6.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" -dependencies = [ - "alloc-no-stdlib", - "alloc-stdlib", - "brotli-decompressor 4.0.2", -] - [[package]] name = "brotli" version = "7.0.0" @@ -1285,6 +1328,15 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "convert_case" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "cookie" version = "0.16.2" @@ -2210,7 +2262,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" dependencies = [ - "derive_more-impl", + "derive_more-impl 1.0.0", +] + +[[package]] +name = "derive_more" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10b768e943bed7bf2cab53df09f4bc34bfd217cdb57d971e769874c9a6710618" +dependencies = [ + "derive_more-impl 2.1.0", ] [[package]] @@ -2226,6 +2287,20 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "derive_more-impl" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d286bfdaf75e988b4a78e013ecd79c581e06399ab53fbacd2d916c2f904f30b" +dependencies = [ + "convert_case 0.10.0", + "proc-macro2", + "quote", + "rustc_version", + "syn 2.0.108", + "unicode-xid", +] + [[package]] name = "digest" version = "0.10.7" @@ -2540,9 +2615,9 @@ checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" [[package]] name = "h2" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" +checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d" dependencies = [ "bytes", "fnv", @@ -2751,7 +2826,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.3.26", + "h2 0.3.27", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -3766,6 +3841,7 @@ dependencies = [ "actix-cors", "actix-web", "actix-web-httpauth", + "actix-web-lab", "actix-web-prometheus", "actix-web-static-files", "anyhow", @@ -4494,7 +4570,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2 0.3.26", + "h2 0.3.27", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.32", @@ -4922,6 +4998,19 @@ dependencies = [ "syn 2.0.108", ] +[[package]] +name = "serde_html_form" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2f2d7ff8a2140333718bb329f5c40fc5f0865b84c426183ce14c97d2ab8154f" +dependencies = [ + "form_urlencoded", + "indexmap", + "itoa", + "ryu", + "serde_core", +] + [[package]] name = "serde_json" version = "1.0.138" @@ -4935,6 +5024,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + [[package]] name = "serde_repr" version = "0.1.19" diff --git a/Cargo.toml b/Cargo.toml index f39441af5..495802ee0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ parquet = "57.1.0" # Web server and HTTP-related actix-cors = "0.7.0" +actix-web-lab = "0.24.3" actix-web = { version = "4.9.0", features = ["rustls-0_22"] } actix-web-httpauth = "0.8" actix-web-prometheus = { version = "0.1" } @@ -83,7 +84,7 @@ tokio = { version = "^1.43", default-features = false, features = [ "fs", "rt-multi-thread", ] } -tokio-stream = { version = "0.1", features = ["fs"] } +tokio-stream = { version = "0.1.17", features = ["fs"] } tokio-util = { version = "0.7" } # # Logging and Metrics diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 19a9d1a0b..b26033f12 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -62,7 +62,8 @@ use crate::metastore::MetastoreError; // use crate::option::Mode; use crate::parseable::{PARSEABLE, StreamNotFound}; use crate::query::{QUERY_SESSION, resolve_stream_names}; -use crate::rbac::map::SessionKey; +use crate::rbac::map::{SessionKey, sessions}; +use crate::sse::{SSE_HANDLER, SSEAlertInfo}; use crate::storage; use crate::storage::ObjectStorageError; use crate::sync::alert_runtime; @@ -606,12 +607,33 @@ impl AlertConfig { pub async fn trigger_notifications(&self, message: String) -> Result<(), AlertError> { let mut context = self.get_context(); - context.message = message; + context.message.clone_from(&message); + for target_id in &self.targets { let target = TARGETS.get_target_by_id(target_id).await?; trace!("Target (trigger_notifications)-\n{target:?}"); target.call(context.clone()); } + + // get active sessions + let active_sessions = sessions().get_active_sessions(); + let mut broadcast_to = vec![]; + for session in active_sessions { + if user_auth_for_query(&session, &self.query).await.is_ok() + && let SessionKey::SessionId(id) = &session + { + broadcast_to.push(*id); + } + } + + if let Ok(msg) = &serde_json::to_string(&SSEAlertInfo { + id: self.id, + state: self.state, + message, + }) { + SSE_HANDLER.broadcast(msg, Some(&broadcast_to)).await; + } + Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 5411ba81e..4bbf629ee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,6 +42,7 @@ pub mod prism; pub mod query; pub mod rbac; mod response; +pub mod sse; mod static_schema; mod stats; pub mod storage; diff --git a/src/rbac/map.rs b/src/rbac/map.rs index e8836c824..578f8903a 100644 --- a/src/rbac/map.rs +++ b/src/rbac/map.rs @@ -28,6 +28,7 @@ use super::{ user, }; use chrono::{DateTime, Utc}; +use itertools::Itertools; use once_cell::sync::{Lazy, OnceCell}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -168,6 +169,10 @@ pub struct Sessions { } impl Sessions { + pub fn get_active_sessions(&self) -> Vec { + self.active_sessions.keys().cloned().collect_vec() + } + // only checks if the session is expired or not pub fn is_session_expired(&self, key: &SessionKey) -> bool { // fetch userid from session key diff --git a/src/sse/mod.rs b/src/sse/mod.rs new file mode 100644 index 000000000..45b4d5460 --- /dev/null +++ b/src/sse/mod.rs @@ -0,0 +1,188 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use actix_web::{HttpRequest, rt::time::interval}; +use actix_web_lab::{ + sse::{self, Sse}, + util::InfallibleStream, +}; +use futures_util::future; + +use itertools::Itertools; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; +use tokio::sync::{RwLock, mpsc}; +use tokio_stream::wrappers::ReceiverStream; +use ulid::Ulid; + +use crate::{ + alerts::AlertState, rbac::map::SessionKey, utils::actix::extract_session_key_from_req, +}; + +pub static SSE_HANDLER: Lazy> = Lazy::new(Broadcaster::create); +pub struct Broadcaster { + inner: RwLock, +} + +#[derive(Debug, Clone, Default)] +struct BroadcasterInner { + // hashmap to map sse session to prism ui session + clients: HashMap>>, +} + +impl Broadcaster { + /// Constructs new broadcaster and spawns ping loop. + pub fn create() -> Arc { + let this = Arc::new(Broadcaster { + inner: RwLock::new(BroadcasterInner::default()), + }); + + Broadcaster::spawn_ping(Arc::clone(&this)); + + this + } + + /// Pings clients every 10 seconds to see if they are alive and remove them from the broadcast + /// list if not. + fn spawn_ping(this: Arc) { + actix_web::rt::spawn(async move { + let mut interval = interval(Duration::from_secs(10)); + + loop { + interval.tick().await; + this.remove_stale_clients().await; + } + }); + } + + /// Removes all non-responsive clients from broadcast list. + async fn remove_stale_clients(&self) { + let sse_inner = self.inner.read().await.clients.clone(); + + let mut ok_sessions = HashMap::new(); + + for (session, clients) in sse_inner.iter() { + let mut ok_clients = Vec::new(); + for client in clients { + if client + .send(sse::Event::Comment("ping".into())) + .await + .is_ok() + { + ok_clients.push(client.clone()) + } + } + ok_sessions.insert(*session, ok_clients); + } + + self.inner.write().await.clients = ok_sessions; + } + + /// Registers client with broadcaster, returning an SSE response body. + pub async fn new_client( + &self, + session: &Ulid, + ) -> Sse>> { + let (tx, rx) = mpsc::channel(10); + + tx.send(sse::Data::new("connected").into()).await.unwrap(); + + if let Some(clients) = self.inner.write().await.clients.get_mut(session) { + clients.push(tx); + } else { + self.inner.write().await.clients.insert(*session, vec![tx]); + } + + Sse::from_infallible_receiver(rx) + } + + pub async fn fetch_sessions(&self) -> Vec { + self.inner + .read() + .await + .clients + .keys() + .cloned() + .collect_vec() + } + + /// Broadcasts `msg` + /// + /// If sessions is None, then broadcast to all + pub async fn broadcast(&self, msg: &str, sessions: Option<&[Ulid]>) { + let clients = self.inner.read().await.clients.clone(); + + let send_futures = if let Some(sessions) = sessions { + let mut futures = vec![]; + for (session, clients) in clients.iter() { + if sessions.contains(session) { + clients + .iter() + .for_each(|client| futures.push(client.send(sse::Data::new(msg).into()))); + } + } + futures + } else { + // broadcast + let mut futures = vec![]; + for (_, clients) in clients.iter() { + clients + .iter() + .for_each(|client| futures.push(client.send(sse::Data::new(msg).into()))); + } + futures + }; + + // try to send to all clients, ignoring failures + // disconnected clients will get swept up by `remove_stale_clients` + let _ = future::join_all(send_futures).await; + } +} + +pub async fn register_sse_client( + broadcaster: actix_web::web::Data>, + req: HttpRequest, +) -> Sse>> { + let session = extract_session_key_from_req(&req).unwrap(); + let sessionid = match session { + SessionKey::SessionId(ulid) => ulid, + _ => unreachable!("SSE requires a valid session. Unable to register client."), + }; + broadcaster.new_client(&sessionid).await +} + +/// Struct to define the messages being sent using SSE +#[derive(Serialize, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SSEEvent { + pub criticality: Criticality, + pub message: Message, +} + +#[derive(Serialize, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum Message { + AlertEvent(SSEAlertInfo), + ControlPlaneEvent(ControlPlaneEvent), +} + +#[derive(Serialize, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum Criticality { + Info, + Warn, + Error, +} + +#[derive(Serialize, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SSEAlertInfo { + pub id: Ulid, + pub state: AlertState, + pub message: String, +} + +#[derive(Serialize, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ControlPlaneEvent { + message: String, +}