Skip to content

Conversation

@parmesant
Copy link
Contributor

@parmesant parmesant commented Dec 15, 2025

Add backend for SSE

Fixes #XXXX.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • Real-time Server-Sent Events (SSE) broadcasting to connected clients.
    • Alert notifications with severity levels (Info, Warn, Error).
    • Control-plane events for system-level messages and session-targeted delivery.
    • Session-aware client management with automatic pruning of stale connections.
  • Chores

    • Updated runtime dependencies.

✏️ Tip: You can customize this high-level summary in your review settings.

Add backend for SSE
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 15, 2025

Walkthrough

Adds a new SSE broadcasting module with session-aware client registration, message types, broadcast/ping-based pruning; exposes it via pub mod sse; updates Cargo dependencies to add actix-web-lab and bump tokio-stream.

Changes

Cohort / File(s) Summary
Dependency updates
Cargo.toml
Added actix-web-lab = "0.24.3"; upgraded tokio-stream from "0.1" to "0.1.17" (retains ["fs"] feature).
Public API export
src/lib.rs
Exported new public module: pub mod sse.
SSE broadcasting system
src/sse/mod.rs
New SSE module: Broadcaster (Arc + async RwLock-wrapped map from Ulid session → client sender lists), Broadcaster::create, new_client, fetch_sessions, broadcast, ping loop (10s) that pings clients and prunes stale connections, register_sse_client Actix handler, SSE event/message types (SSEEvent, Message, Criticality, SSEAlertInfo, ControlPlaneEvent), and a static SSE_HANDLER (Lazy<Arc>).
Alert integration
src/alerts/mod.rs
After notifying targets, collects active sessions, filters by authorization, clones message into context, and broadcasts SSE alert payloads (id, state, message) to matching sessions using SSE_HANDLER.
Session API extension
src/rbac/map.rs
Added pub fn get_active_sessions(&self) -> Vec<SessionKey>; imported itertools::Itertools to use collect_vec.

Sequence Diagram(s)

sequenceDiagram
    actor Client
    participant Actix as Actix Handler
    participant Broadcaster
    participant DB as Sessions Map
    participant Stream as SSE Stream

    Client->>Actix: HTTP SSE request (with session)
    Actix->>DB: extract/validate session
    DB-->>Actix: session (Ulid)
    Actix->>Broadcaster: register_sse_client(session)
    Broadcaster->>Broadcaster: Broadcaster::new_client -> create channel, store sender under session
    Broadcaster->>Stream: return ReceiverStream as Sse
    Stream-->>Client: SSE connection established

    Note over Broadcaster: periodic ping loop (every 10s)
    participant Alerts as Alerts module
    Alerts->>DB: get_active_sessions()
    DB-->>Alerts: list of session keys
    Alerts->>Broadcaster: broadcast(alert_payload, sessions?)
    Broadcaster->>Stream: send events to matching client senders
    Stream-->>Client: event arrives

    alt Unresponsive client
        Broadcaster->>Broadcaster: ping fails -> remove stale sender
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Inspect concurrency and ownership around Arc + async RwLock + channel senders.
  • Verify ping/prune correctness and handling of send failures.
  • Confirm serialization (Serialize/Deserialize) and camelCase field renaming for SSE payloads.
  • Check Actix handler register_sse_client for correct session extraction and panic points (unwraps).
  • Review use of itertools and the public API exposure of sessions.

Poem

🐰 I nibble on code and hum soft pings,

I stitch sessions with tiny wings,
Alerts hop out on breezy streams,
I prune the stale and guard the beams. ✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is largely incomplete with only the placeholder text and unchecked checklist items; the actual goal, solution rationale, and key changes are not filled out. Complete the description sections: explain the SSE goal, describe the implementation approach, detail key changes (Broadcaster, ping loop, alert broadcasting), and check or update the verification checklist accordingly.
Title check ❓ Inconclusive The title "feat: SSE" is vague and generic, using a non-descriptive abbreviation without context about what SSE functionality is being added. Expand the title to be more specific, e.g., 'feat: add SSE broadcaster and alert integration' to clarify the scope and main purpose of the changes.
✅ Passed checks (1 passed)
Check name Status Explanation
Docstring Coverage ✅ Passed Docstring coverage is 80.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (1)
src/sse/mod.rs (1)

56-76: Consider cleaning up empty sessions.

When all clients for a session disconnect, the session key remains in the map with an empty Vec. This can lead to accumulation of stale session entries over time.

Apply this diff to filter out empty sessions:

         for (session, clients) in sse_inner.iter() {
             let mut ok_clients = Vec::new();
             for client in clients {
                 if client
                     .send(sse::Event::Comment("ping".into()))
                     .await
                     .is_ok()
                 {
                     ok_clients.push(client.clone())
                 }
             }
-            ok_sessions.insert(*session, ok_clients);
+            if !ok_clients.is_empty() {
+                ok_sessions.insert(*session, ok_clients);
+            }
         }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5c68c3f and 6ce3508.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (3)
  • Cargo.toml (2 hunks)
  • src/lib.rs (1 hunks)
  • src/sse/mod.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.

Applied to files:

  • src/lib.rs
🧬 Code graph analysis (1)
src/sse/mod.rs (1)
src/utils/actix.rs (1)
  • extract_session_key_from_req (51-71)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
🔇 Additional comments (4)
src/lib.rs (1)

45-45: LGTM!

The new sse module is correctly exposed and follows the existing alphabetical ordering convention.

Cargo.toml (2)

87-87: LGTM!

Pinning tokio-stream to 0.1.17 for reproducibility is reasonable.


33-33: The actix-web-lab version 0.24.3 is the latest stable release and is compatible with actix-web 4.9.0. No changes needed.

src/sse/mod.rs (1)

20-40: LGTM!

The Broadcaster design is appropriate: Arc for shared ownership across handlers, RwLock for async read-write access, and spawning a cleanup loop on creation.

Comment on lines +87 to +91
if let Some(clients) = self.inner.write().await.clients.get_mut(session) {
clients.push(tx);
} else {
self.inner.write().await.clients.insert(*session, vec![tx]);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix race condition with double write lock acquisition.

The code acquires a write lock on line 87, checks if the session exists, then drops the lock. In the else branch on line 90, it acquires another write lock. Between these two acquisitions, another concurrent call to new_client for the same session could insert an entry, leading to lost client registrations.

Use the Entry API to handle this atomically:

-        if let Some(clients) = self.inner.write().await.clients.get_mut(session) {
-            clients.push(tx);
-        } else {
-            self.inner.write().await.clients.insert(*session, vec![tx]);
-        }
+        self.inner
+            .write()
+            .await
+            .clients
+            .entry(*session)
+            .or_default()
+            .push(tx);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if let Some(clients) = self.inner.write().await.clients.get_mut(session) {
clients.push(tx);
} else {
self.inner.write().await.clients.insert(*session, vec![tx]);
}
self.inner
.write()
.await
.clients
.entry(*session)
.or_default()
.push(tx);
🤖 Prompt for AI Agents
In src/sse/mod.rs around lines 87-91, avoid acquiring the write lock twice by
holding a single write lock and using the HashMap Entry API to insert-or-get the
client Vec atomically: obtain let mut guard = self.inner.write().await once,
then call guard.clients.entry(key).or_insert_with(Vec::new).push(tx) (use the
appropriate key ownership/clone or dereference as needed) so concurrent
new_client calls cannot race and lose registrations.

src/sse/mod.rs Outdated
Comment on lines 99 to 101
pub async fn broadcast(&self, msg: &str, sessions: Option<&[Ulid]>) {
let clients = self.inner.read().await.clients.clone();
warn!(clients=?clients);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Change log level from warn to debug or trace.

Logging all connected clients at warn level on every broadcast will spam logs during normal operation. This appears to be debug logging left over from development.

-        warn!(clients=?clients);
+        tracing::debug!(clients=?clients);

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/sse/mod.rs around lines 99 to 101, the code currently logs connected
clients at warn level (warn!(clients=?clients)); change this to a lower
verbosity (use debug! or trace! instead of warn!) so normal broadcasts don't
spam logs; simply replace the warn! macro with debug! (or trace! if you prefer
more verbosity) and keep the same structured field formatting (clients=?clients)
so only the log level changes.

Comment on lines +130 to +140
pub async fn register_sse_client(
broadcaster: actix_web::web::Data<Arc<Broadcaster>>,
req: HttpRequest,
) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
let session = extract_session_key_from_req(&req).unwrap();
let sessionid = match session {
SessionKey::SessionId(ulid) => ulid,
_ => unreachable!("SSE requires a valid session. Unable to register client."),
};
broadcaster.new_client(&sessionid).await
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Replace panics with proper error handling.

This function will panic on unauthenticated requests (.unwrap() on line 134) or when a user authenticates via BasicAuth instead of session cookie (unreachable!() on line 137). HTTP handlers should not panic; they should return appropriate error responses.

Consider returning a Result or an Actix error response:

-pub async fn register_sse_client(
+pub async fn register_sse_client(
     broadcaster: actix_web::web::Data<Arc<Broadcaster>>,
     req: HttpRequest,
-) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
-    let session = extract_session_key_from_req(&req).unwrap();
-    let sessionid = match session {
-        SessionKey::SessionId(ulid) => ulid,
-        _ => unreachable!("SSE requires a valid session. Unable to register client."),
-    };
-    broadcaster.new_client(&sessionid).await
+) -> Result<Sse<InfallibleStream<ReceiverStream<sse::Event>>>, actix_web::Error> {
+    let session = extract_session_key_from_req(&req)?;
+    let sessionid = match session {
+        SessionKey::SessionId(ulid) => ulid,
+        SessionKey::BasicAuth { .. } => {
+            return Err(actix_web::error::ErrorBadRequest(
+                "SSE requires session-based authentication, not BasicAuth",
+            ));
+        }
+    };
+    Ok(broadcaster.new_client(&sessionid).await)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn register_sse_client(
broadcaster: actix_web::web::Data<Arc<Broadcaster>>,
req: HttpRequest,
) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
let session = extract_session_key_from_req(&req).unwrap();
let sessionid = match session {
SessionKey::SessionId(ulid) => ulid,
_ => unreachable!("SSE requires a valid session. Unable to register client."),
};
broadcaster.new_client(&sessionid).await
}
pub async fn register_sse_client(
broadcaster: actix_web::web::Data<Arc<Broadcaster>>,
req: HttpRequest,
) -> Result<Sse<InfallibleStream<ReceiverStream<sse::Event>>>, actix_web::Error> {
let session = extract_session_key_from_req(&req)?;
let sessionid = match session {
SessionKey::SessionId(ulid) => ulid,
SessionKey::BasicAuth { .. } => {
return Err(actix_web::error::ErrorBadRequest(
"SSE requires session-based authentication, not BasicAuth",
));
}
};
Ok(broadcaster.new_client(&sessionid).await)
}
🤖 Prompt for AI Agents
In src/sse/mod.rs around lines 130–140, replace the panic-causing unwrap() and
unreachable!() with proper error handling and change the handler to return a
Result so unauthorized/invalid requests produce HTTP errors instead of panics:
update the function signature to return
Result<Sse<InfallibleStream<ReceiverStream<sse::Event>>>, actix_web::Error> (or
Result<_, HttpResponse>), call extract_session_key_from_req and map its None/Err
to an appropriate HttpResponse::Unauthorized/BadRequest, match SessionKey and
return an Err(HttpResponse::Unauthorized().finish()) when not a SessionId, and
finally return Ok(broadcaster.new_client(&sessionid).await) so any broadcaster
errors can be propagated or converted into an HTTP 500 response.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
src/sse/mod.rs (3)

89-93: Fix race condition with double write lock acquisition.

This issue was flagged in a previous review and remains unaddressed. The code acquires a write lock, checks if the session exists, drops the lock, then acquires another write lock in the else branch. A concurrent call could insert between these acquisitions.

Use the Entry API:

-        if let Some(clients) = self.inner.write().await.clients.get_mut(session) {
-            clients.push(tx);
-        } else {
-            self.inner.write().await.clients.insert(*session, vec![tx]);
-        }
+        self.inner
+            .write()
+            .await
+            .clients
+            .entry(*session)
+            .or_default()
+            .push(tx);

141-151: Replace panics with proper error handling.

This issue was flagged in a previous review and remains unaddressed. The unwrap() on line 145 and unreachable!() on line 148 will panic on unauthenticated requests or BasicAuth usage. HTTP handlers should return appropriate error responses instead:

 pub async fn register_sse_client(
     broadcaster: actix_web::web::Data<Arc<Broadcaster>>,
     req: HttpRequest,
-) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
-    let session = extract_session_key_from_req(&req).unwrap();
-    let sessionid = match session {
-        SessionKey::SessionId(ulid) => ulid,
-        _ => unreachable!("SSE requires a valid session. Unable to register client."),
-    };
-    broadcaster.new_client(&sessionid).await
+) -> Result<Sse<InfallibleStream<ReceiverStream<sse::Event>>>, actix_web::Error> {
+    let session = extract_session_key_from_req(&req)?;
+    let sessionid = match session {
+        SessionKey::SessionId(ulid) => ulid,
+        SessionKey::BasicAuth { .. } => {
+            return Err(actix_web::error::ErrorBadRequest(
+                "SSE requires session-based authentication, not BasicAuth",
+            ));
+        }
+    };
+    Ok(broadcaster.new_client(&sessionid).await)
 }

184-188: ControlPlaneEvent.message field is still private.

A previous review noted that private fields prevent external instantiation. While SSEAlertInfo fields were made public, ControlPlaneEvent.message remains private:

 pub struct ControlPlaneEvent {
-    message: String,
+    pub message: String,
 }
🧹 Nitpick comments (1)
src/sse/mod.rs (1)

57-78: Consider avoiding full HashMap clone in remove_stale_clients.

Cloning the entire clients HashMap on line 59 could become expensive with many connected clients. Consider iterating with the write lock held and mutating in place, or using retain semantics:

     async fn remove_stale_clients(&self) {
-        let sse_inner = self.inner.read().await.clients.clone();
-
-        let mut ok_sessions = HashMap::new();
-
-        for (session, clients) in sse_inner.iter() {
-            let mut ok_clients = Vec::new();
-            for client in clients {
-                if client
-                    .send(sse::Event::Comment("ping".into()))
-                    .await
-                    .is_ok()
-                {
-                    ok_clients.push(client.clone())
+        let mut guard = self.inner.write().await;
+        for clients in guard.clients.values_mut() {
+            let mut ok_clients = Vec::with_capacity(clients.len());
+            for client in clients.drain(..) {
+                if client.send(sse::Event::Comment("ping".into())).await.is_ok() {
+                    ok_clients.push(client);
                 }
             }
-            ok_sessions.insert(*session, ok_clients);
+            *clients = ok_clients;
         }
-
-        self.inner.write().await.clients = ok_sessions;
+        guard.clients.retain(|_, v| !v.is_empty());
     }

This avoids cloning and removes empty session entries.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6ce3508 and 9e6f085.

📒 Files selected for processing (3)
  • src/alerts/mod.rs (2 hunks)
  • src/rbac/map.rs (2 hunks)
  • src/sse/mod.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (6)
📚 Learning: 2025-07-24T11:09:21.781Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1388
File: src/alerts/mod.rs:88-104
Timestamp: 2025-07-24T11:09:21.781Z
Learning: In the Parseable alert system (src/alerts/mod.rs), alert versions are server-generated and controlled via CURRENT_ALERTS_VERSION constant, not user input. The AlertVerison enum's From<&str> implementation correctly defaults unknown versions to V2 since the server only generates known versions (v1, v2). Unknown versions would only occur in exceptional cases like file corruption, making the current fallback approach appropriate.

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/alerts/mod.rs
🧬 Code graph analysis (2)
src/alerts/mod.rs (2)
src/rbac/map.rs (1)
  • sessions (92-98)
src/utils/mod.rs (1)
  • user_auth_for_query (78-87)
src/sse/mod.rs (1)
src/utils/actix.rs (1)
  • extract_session_key_from_req (51-71)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
🔇 Additional comments (2)
src/rbac/map.rs (1)

172-174: LGTM!

The get_active_sessions method cleanly exposes the session keys needed for SSE broadcasting. The implementation correctly clones keys to avoid holding the lock longer than necessary.

src/alerts/mod.rs (1)

617-636: SSE broadcast integration looks good.

The authorization check correctly filters sessions to only broadcast alerts to users who have access to the alert's query. The silent skip on serialization failure (line 629) is acceptable since SSEAlertInfo serialization is unlikely to fail.

) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
let (tx, rx) = mpsc::channel(10);

tx.send(sse::Data::new("connected").into()).await.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid unwrap() on initial send.

The unwrap() will panic if the channel is already closed. While unlikely immediately after creation, it's safer to handle this gracefully:

-        tx.send(sse::Data::new("connected").into()).await.unwrap();
+        let _ = tx.send(sse::Data::new("connected").into()).await;

The client won't receive the "connected" message if the send fails, but the connection will still be registered and subsequent broadcasts will work (or get cleaned up by the ping loop).

🤖 Prompt for AI Agents
In src/sse/mod.rs around line 87, the call tx.send(...).await.unwrap() can panic
if the channel is closed; change it to handle the Result from await instead of
unwrapping: await the send, match or if let on the Result, and on Err just
log/debug/ignore the failure and continue (do not return or panic) so the
connection is still registered and normal cleanup/broadcast behavior can
proceed.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
src/alerts/mod.rs (3)

610-610: Good optimization using clone_from.

Using clone_from instead of direct assignment with clone() can be more efficient as it may reuse existing allocations.


618-627: Consider parallelizing authorization checks for better performance.

The code sequentially iterates through all active sessions and performs async authorization checks. If there are many active sessions, this could introduce noticeable latency in the alert notification path.

Consider parallelizing the authorization checks similar to the pattern used in list_alerts_for_user (lines 1137-1155):

-        let active_sessions = sessions().get_active_sessions();
-        let mut broadcast_to = vec![];
-        for session in active_sessions {
-            if user_auth_for_query(&session, &self.query).await.is_ok()
-                && let SessionKey::SessionId(id) = &session
-            {
-                broadcast_to.push(*id);
-            }
-        }
+        let active_sessions = sessions().get_active_sessions();
+        let auth_futures: Vec<_> = active_sessions
+            .into_iter()
+            .map(|session| async move {
+                if user_auth_for_query(&session, &self.query).await.is_ok() {
+                    if let SessionKey::SessionId(id) = session {
+                        Some(id)
+                    } else {
+                        None
+                    }
+                } else {
+                    None
+                }
+            })
+            .collect();
+        let broadcast_to: Vec<_> = futures::future::join_all(auth_futures)
+            .await
+            .into_iter()
+            .flatten()
+            .collect();

629-635: Add error logging for SSE serialization and broadcast failures.

The code silently ignores serialization errors and doesn't handle potential broadcast failures. This could make debugging SSE issues difficult.

Apply this diff to add error logging:

-        if let Ok(msg) = &serde_json::to_string(&SSEAlertInfo {
+        match serde_json::to_string(&SSEAlertInfo {
             id: self.id,
             state: self.state,
             message,
-        }) {
+        }) {
+            Ok(msg) => {
-            SSE_HANDLER.broadcast(msg, Some(&broadcast_to)).await;
+                SSE_HANDLER.broadcast(&msg, Some(&broadcast_to)).await;
+            }
+            Err(e) => {
+                error!("Failed to serialize SSE alert info for alert {}: {}", self.id, e);
+            }
         }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9e6f085 and 8a35e0d.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • Cargo.toml (2 hunks)
  • src/alerts/mod.rs (2 hunks)
  • src/lib.rs (1 hunks)
  • src/rbac/map.rs (2 hunks)
  • src/sse/mod.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • Cargo.toml
  • src/rbac/map.rs
  • src/sse/mod.rs
  • src/lib.rs
🧰 Additional context used
🧠 Learnings (6)
📚 Learning: 2025-07-24T11:09:21.781Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1388
File: src/alerts/mod.rs:88-104
Timestamp: 2025-07-24T11:09:21.781Z
Learning: In the Parseable alert system (src/alerts/mod.rs), alert versions are server-generated and controlled via CURRENT_ALERTS_VERSION constant, not user input. The AlertVerison enum's From<&str> implementation correctly defaults unknown versions to V2 since the server only generates known versions (v1, v2). Unknown versions would only occur in exceptional cases like file corruption, making the current fallback approach appropriate.

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.

Applied to files:

  • src/alerts/mod.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/alerts/mod.rs
🧬 Code graph analysis (1)
src/alerts/mod.rs (2)
src/rbac/map.rs (1)
  • sessions (92-98)
src/utils/mod.rs (1)
  • user_auth_for_query (78-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
🔇 Additional comments (2)
src/alerts/mod.rs (2)

618-636: SSE types and handler method signatures are correct.

Verification confirms SSEAlertInfo struct has the expected fields (id: Ulid, state: AlertState, message: String) and SSE_HANDLER.broadcast(&self, msg: &str, sessions: Option<&[Ulid]>) accepts the correct parameter types. The code usage is valid.


65-66: Imports are correct. Both SSE_HANDLER and SSEAlertInfo are properly exported from src/sse/mod.rs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant