-
-
Notifications
You must be signed in to change notification settings - Fork 157
feat: SSE #1487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: SSE #1487
Conversation
WalkthroughAdds a new SSE broadcasting module with session-aware client registration, message types, broadcast/ping-based pruning; exposes it via Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant Actix as Actix Handler
participant Broadcaster
participant DB as Sessions Map
participant Stream as SSE Stream
Client->>Actix: HTTP SSE request (with session)
Actix->>DB: extract/validate session
DB-->>Actix: session (Ulid)
Actix->>Broadcaster: register_sse_client(session)
Broadcaster->>Broadcaster: Broadcaster::new_client -> create channel, store sender under session
Broadcaster->>Stream: return ReceiverStream as Sse
Stream-->>Client: SSE connection established
Note over Broadcaster: periodic ping loop (every 10s)
participant Alerts as Alerts module
Alerts->>DB: get_active_sessions()
DB-->>Alerts: list of session keys
Alerts->>Broadcaster: broadcast(alert_payload, sessions?)
Broadcaster->>Stream: send events to matching client senders
Stream-->>Client: event arrives
alt Unresponsive client
Broadcaster->>Broadcaster: ping fails -> remove stale sender
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (1)
src/sse/mod.rs (1)
56-76: Consider cleaning up empty sessions.When all clients for a session disconnect, the session key remains in the map with an empty
Vec. This can lead to accumulation of stale session entries over time.Apply this diff to filter out empty sessions:
for (session, clients) in sse_inner.iter() { let mut ok_clients = Vec::new(); for client in clients { if client .send(sse::Event::Comment("ping".into())) .await .is_ok() { ok_clients.push(client.clone()) } } - ok_sessions.insert(*session, ok_clients); + if !ok_clients.is_empty() { + ok_sessions.insert(*session, ok_clients); + } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
Cargo.toml(2 hunks)src/lib.rs(1 hunks)src/sse/mod.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.
Applied to files:
src/lib.rs
🧬 Code graph analysis (1)
src/sse/mod.rs (1)
src/utils/actix.rs (1)
extract_session_key_from_req(51-71)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: coverage
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
🔇 Additional comments (4)
src/lib.rs (1)
45-45: LGTM!The new
ssemodule is correctly exposed and follows the existing alphabetical ordering convention.Cargo.toml (2)
87-87: LGTM!Pinning tokio-stream to 0.1.17 for reproducibility is reasonable.
33-33: Theactix-web-labversion 0.24.3 is the latest stable release and is compatible with actix-web 4.9.0. No changes needed.src/sse/mod.rs (1)
20-40: LGTM!The
Broadcasterdesign is appropriate:Arcfor shared ownership across handlers,RwLockfor async read-write access, and spawning a cleanup loop on creation.
| if let Some(clients) = self.inner.write().await.clients.get_mut(session) { | ||
| clients.push(tx); | ||
| } else { | ||
| self.inner.write().await.clients.insert(*session, vec![tx]); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix race condition with double write lock acquisition.
The code acquires a write lock on line 87, checks if the session exists, then drops the lock. In the else branch on line 90, it acquires another write lock. Between these two acquisitions, another concurrent call to new_client for the same session could insert an entry, leading to lost client registrations.
Use the Entry API to handle this atomically:
- if let Some(clients) = self.inner.write().await.clients.get_mut(session) {
- clients.push(tx);
- } else {
- self.inner.write().await.clients.insert(*session, vec![tx]);
- }
+ self.inner
+ .write()
+ .await
+ .clients
+ .entry(*session)
+ .or_default()
+ .push(tx);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if let Some(clients) = self.inner.write().await.clients.get_mut(session) { | |
| clients.push(tx); | |
| } else { | |
| self.inner.write().await.clients.insert(*session, vec![tx]); | |
| } | |
| self.inner | |
| .write() | |
| .await | |
| .clients | |
| .entry(*session) | |
| .or_default() | |
| .push(tx); |
🤖 Prompt for AI Agents
In src/sse/mod.rs around lines 87-91, avoid acquiring the write lock twice by
holding a single write lock and using the HashMap Entry API to insert-or-get the
client Vec atomically: obtain let mut guard = self.inner.write().await once,
then call guard.clients.entry(key).or_insert_with(Vec::new).push(tx) (use the
appropriate key ownership/clone or dereference as needed) so concurrent
new_client calls cannot race and lose registrations.
src/sse/mod.rs
Outdated
| pub async fn broadcast(&self, msg: &str, sessions: Option<&[Ulid]>) { | ||
| let clients = self.inner.read().await.clients.clone(); | ||
| warn!(clients=?clients); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change log level from warn to debug or trace.
Logging all connected clients at warn level on every broadcast will spam logs during normal operation. This appears to be debug logging left over from development.
- warn!(clients=?clients);
+ tracing::debug!(clients=?clients);Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/sse/mod.rs around lines 99 to 101, the code currently logs connected
clients at warn level (warn!(clients=?clients)); change this to a lower
verbosity (use debug! or trace! instead of warn!) so normal broadcasts don't
spam logs; simply replace the warn! macro with debug! (or trace! if you prefer
more verbosity) and keep the same structured field formatting (clients=?clients)
so only the log level changes.
| pub async fn register_sse_client( | ||
| broadcaster: actix_web::web::Data<Arc<Broadcaster>>, | ||
| req: HttpRequest, | ||
| ) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> { | ||
| let session = extract_session_key_from_req(&req).unwrap(); | ||
| let sessionid = match session { | ||
| SessionKey::SessionId(ulid) => ulid, | ||
| _ => unreachable!("SSE requires a valid session. Unable to register client."), | ||
| }; | ||
| broadcaster.new_client(&sessionid).await | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace panics with proper error handling.
This function will panic on unauthenticated requests (.unwrap() on line 134) or when a user authenticates via BasicAuth instead of session cookie (unreachable!() on line 137). HTTP handlers should not panic; they should return appropriate error responses.
Consider returning a Result or an Actix error response:
-pub async fn register_sse_client(
+pub async fn register_sse_client(
broadcaster: actix_web::web::Data<Arc<Broadcaster>>,
req: HttpRequest,
-) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
- let session = extract_session_key_from_req(&req).unwrap();
- let sessionid = match session {
- SessionKey::SessionId(ulid) => ulid,
- _ => unreachable!("SSE requires a valid session. Unable to register client."),
- };
- broadcaster.new_client(&sessionid).await
+) -> Result<Sse<InfallibleStream<ReceiverStream<sse::Event>>>, actix_web::Error> {
+ let session = extract_session_key_from_req(&req)?;
+ let sessionid = match session {
+ SessionKey::SessionId(ulid) => ulid,
+ SessionKey::BasicAuth { .. } => {
+ return Err(actix_web::error::ErrorBadRequest(
+ "SSE requires session-based authentication, not BasicAuth",
+ ));
+ }
+ };
+ Ok(broadcaster.new_client(&sessionid).await)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub async fn register_sse_client( | |
| broadcaster: actix_web::web::Data<Arc<Broadcaster>>, | |
| req: HttpRequest, | |
| ) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> { | |
| let session = extract_session_key_from_req(&req).unwrap(); | |
| let sessionid = match session { | |
| SessionKey::SessionId(ulid) => ulid, | |
| _ => unreachable!("SSE requires a valid session. Unable to register client."), | |
| }; | |
| broadcaster.new_client(&sessionid).await | |
| } | |
| pub async fn register_sse_client( | |
| broadcaster: actix_web::web::Data<Arc<Broadcaster>>, | |
| req: HttpRequest, | |
| ) -> Result<Sse<InfallibleStream<ReceiverStream<sse::Event>>>, actix_web::Error> { | |
| let session = extract_session_key_from_req(&req)?; | |
| let sessionid = match session { | |
| SessionKey::SessionId(ulid) => ulid, | |
| SessionKey::BasicAuth { .. } => { | |
| return Err(actix_web::error::ErrorBadRequest( | |
| "SSE requires session-based authentication, not BasicAuth", | |
| )); | |
| } | |
| }; | |
| Ok(broadcaster.new_client(&sessionid).await) | |
| } |
🤖 Prompt for AI Agents
In src/sse/mod.rs around lines 130–140, replace the panic-causing unwrap() and
unreachable!() with proper error handling and change the handler to return a
Result so unauthorized/invalid requests produce HTTP errors instead of panics:
update the function signature to return
Result<Sse<InfallibleStream<ReceiverStream<sse::Event>>>, actix_web::Error> (or
Result<_, HttpResponse>), call extract_session_key_from_req and map its None/Err
to an appropriate HttpResponse::Unauthorized/BadRequest, match SessionKey and
return an Err(HttpResponse::Unauthorized().finish()) when not a SessionId, and
finally return Ok(broadcaster.new_client(&sessionid).await) so any broadcaster
errors can be propagated or converted into an HTTP 500 response.
9e6f085 to
8a35e0d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
src/sse/mod.rs (3)
89-93: Fix race condition with double write lock acquisition.This issue was flagged in a previous review and remains unaddressed. The code acquires a write lock, checks if the session exists, drops the lock, then acquires another write lock in the
elsebranch. A concurrent call could insert between these acquisitions.Use the
EntryAPI:- if let Some(clients) = self.inner.write().await.clients.get_mut(session) { - clients.push(tx); - } else { - self.inner.write().await.clients.insert(*session, vec![tx]); - } + self.inner + .write() + .await + .clients + .entry(*session) + .or_default() + .push(tx);
141-151: Replace panics with proper error handling.This issue was flagged in a previous review and remains unaddressed. The
unwrap()on line 145 andunreachable!()on line 148 will panic on unauthenticated requests or BasicAuth usage. HTTP handlers should return appropriate error responses instead:pub async fn register_sse_client( broadcaster: actix_web::web::Data<Arc<Broadcaster>>, req: HttpRequest, -) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> { - let session = extract_session_key_from_req(&req).unwrap(); - let sessionid = match session { - SessionKey::SessionId(ulid) => ulid, - _ => unreachable!("SSE requires a valid session. Unable to register client."), - }; - broadcaster.new_client(&sessionid).await +) -> Result<Sse<InfallibleStream<ReceiverStream<sse::Event>>>, actix_web::Error> { + let session = extract_session_key_from_req(&req)?; + let sessionid = match session { + SessionKey::SessionId(ulid) => ulid, + SessionKey::BasicAuth { .. } => { + return Err(actix_web::error::ErrorBadRequest( + "SSE requires session-based authentication, not BasicAuth", + )); + } + }; + Ok(broadcaster.new_client(&sessionid).await) }
184-188:ControlPlaneEvent.messagefield is still private.A previous review noted that private fields prevent external instantiation. While
SSEAlertInfofields were made public,ControlPlaneEvent.messageremains private:pub struct ControlPlaneEvent { - message: String, + pub message: String, }
🧹 Nitpick comments (1)
src/sse/mod.rs (1)
57-78: Consider avoiding full HashMap clone inremove_stale_clients.Cloning the entire
clientsHashMap on line 59 could become expensive with many connected clients. Consider iterating with the write lock held and mutating in place, or usingretainsemantics:async fn remove_stale_clients(&self) { - let sse_inner = self.inner.read().await.clients.clone(); - - let mut ok_sessions = HashMap::new(); - - for (session, clients) in sse_inner.iter() { - let mut ok_clients = Vec::new(); - for client in clients { - if client - .send(sse::Event::Comment("ping".into())) - .await - .is_ok() - { - ok_clients.push(client.clone()) + let mut guard = self.inner.write().await; + for clients in guard.clients.values_mut() { + let mut ok_clients = Vec::with_capacity(clients.len()); + for client in clients.drain(..) { + if client.send(sse::Event::Comment("ping".into())).await.is_ok() { + ok_clients.push(client); } } - ok_sessions.insert(*session, ok_clients); + *clients = ok_clients; } - - self.inner.write().await.clients = ok_sessions; + guard.clients.retain(|_, v| !v.is_empty()); }This avoids cloning and removes empty session entries.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/alerts/mod.rs(2 hunks)src/rbac/map.rs(2 hunks)src/sse/mod.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (6)
📚 Learning: 2025-07-24T11:09:21.781Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1388
File: src/alerts/mod.rs:88-104
Timestamp: 2025-07-24T11:09:21.781Z
Learning: In the Parseable alert system (src/alerts/mod.rs), alert versions are server-generated and controlled via CURRENT_ALERTS_VERSION constant, not user input. The AlertVerison enum's From<&str> implementation correctly defaults unknown versions to V2 since the server only generates known versions (v1, v2). Unknown versions would only occur in exceptional cases like file corruption, making the current fallback approach appropriate.
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.
Applied to files:
src/alerts/mod.rs
🧬 Code graph analysis (2)
src/alerts/mod.rs (2)
src/rbac/map.rs (1)
sessions(92-98)src/utils/mod.rs (1)
user_auth_for_query(78-87)
src/sse/mod.rs (1)
src/utils/actix.rs (1)
extract_session_key_from_req(51-71)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
🔇 Additional comments (2)
src/rbac/map.rs (1)
172-174: LGTM!The
get_active_sessionsmethod cleanly exposes the session keys needed for SSE broadcasting. The implementation correctly clones keys to avoid holding the lock longer than necessary.src/alerts/mod.rs (1)
617-636: SSE broadcast integration looks good.The authorization check correctly filters sessions to only broadcast alerts to users who have access to the alert's query. The silent skip on serialization failure (line 629) is acceptable since
SSEAlertInfoserialization is unlikely to fail.
| ) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> { | ||
| let (tx, rx) = mpsc::channel(10); | ||
|
|
||
| tx.send(sse::Data::new("connected").into()).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid unwrap() on initial send.
The unwrap() will panic if the channel is already closed. While unlikely immediately after creation, it's safer to handle this gracefully:
- tx.send(sse::Data::new("connected").into()).await.unwrap();
+ let _ = tx.send(sse::Data::new("connected").into()).await;The client won't receive the "connected" message if the send fails, but the connection will still be registered and subsequent broadcasts will work (or get cleaned up by the ping loop).
🤖 Prompt for AI Agents
In src/sse/mod.rs around line 87, the call tx.send(...).await.unwrap() can panic
if the channel is closed; change it to handle the Result from await instead of
unwrapping: await the send, match or if let on the Result, and on Err just
log/debug/ignore the failure and continue (do not return or panic) so the
connection is still registered and normal cleanup/broadcast behavior can
proceed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
src/alerts/mod.rs (3)
610-610: Good optimization usingclone_from.Using
clone_frominstead of direct assignment withclone()can be more efficient as it may reuse existing allocations.
618-627: Consider parallelizing authorization checks for better performance.The code sequentially iterates through all active sessions and performs async authorization checks. If there are many active sessions, this could introduce noticeable latency in the alert notification path.
Consider parallelizing the authorization checks similar to the pattern used in
list_alerts_for_user(lines 1137-1155):- let active_sessions = sessions().get_active_sessions(); - let mut broadcast_to = vec![]; - for session in active_sessions { - if user_auth_for_query(&session, &self.query).await.is_ok() - && let SessionKey::SessionId(id) = &session - { - broadcast_to.push(*id); - } - } + let active_sessions = sessions().get_active_sessions(); + let auth_futures: Vec<_> = active_sessions + .into_iter() + .map(|session| async move { + if user_auth_for_query(&session, &self.query).await.is_ok() { + if let SessionKey::SessionId(id) = session { + Some(id) + } else { + None + } + } else { + None + } + }) + .collect(); + let broadcast_to: Vec<_> = futures::future::join_all(auth_futures) + .await + .into_iter() + .flatten() + .collect();
629-635: Add error logging for SSE serialization and broadcast failures.The code silently ignores serialization errors and doesn't handle potential broadcast failures. This could make debugging SSE issues difficult.
Apply this diff to add error logging:
- if let Ok(msg) = &serde_json::to_string(&SSEAlertInfo { + match serde_json::to_string(&SSEAlertInfo { id: self.id, state: self.state, message, - }) { + }) { + Ok(msg) => { - SSE_HANDLER.broadcast(msg, Some(&broadcast_to)).await; + SSE_HANDLER.broadcast(&msg, Some(&broadcast_to)).await; + } + Err(e) => { + error!("Failed to serialize SSE alert info for alert {}: {}", self.id, e); + } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (5)
Cargo.toml(2 hunks)src/alerts/mod.rs(2 hunks)src/lib.rs(1 hunks)src/rbac/map.rs(2 hunks)src/sse/mod.rs(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- Cargo.toml
- src/rbac/map.rs
- src/sse/mod.rs
- src/lib.rs
🧰 Additional context used
🧠 Learnings (6)
📚 Learning: 2025-07-24T11:09:21.781Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1388
File: src/alerts/mod.rs:88-104
Timestamp: 2025-07-24T11:09:21.781Z
Learning: In the Parseable alert system (src/alerts/mod.rs), alert versions are server-generated and controlled via CURRENT_ALERTS_VERSION constant, not user input. The AlertVerison enum's From<&str> implementation correctly defaults unknown versions to V2 since the server only generates known versions (v1, v2). Unknown versions would only occur in exceptional cases like file corruption, making the current fallback approach appropriate.
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
Repo: parseablehq/parseable PR: 1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.
Applied to files:
src/alerts/mod.rs
📚 Learning: 2025-10-21T02:22:24.403Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.403Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.
Applied to files:
src/alerts/mod.rs
🧬 Code graph analysis (1)
src/alerts/mod.rs (2)
src/rbac/map.rs (1)
sessions(92-98)src/utils/mod.rs (1)
user_auth_for_query(78-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
🔇 Additional comments (2)
src/alerts/mod.rs (2)
618-636: SSE types and handler method signatures are correct.Verification confirms
SSEAlertInfostruct has the expected fields (id: Ulid,state: AlertState,message: String) andSSE_HANDLER.broadcast(&self, msg: &str, sessions: Option<&[Ulid]>)accepts the correct parameter types. The code usage is valid.
65-66: Imports are correct. BothSSE_HANDLERandSSEAlertInfoare properly exported fromsrc/sse/mod.rs.
Add backend for SSE
Fixes #XXXX.
Description
This PR has:
Summary by CodeRabbit
New Features
Chores
✏️ Tip: You can customize this high-level summary in your review settings.