-
-
Notifications
You must be signed in to change notification settings - Fork 157
fix query scan metrics #1483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix query scan metrics #1483
Conversation
WalkthroughReplaces the previous streaming return with a pinned Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant QueryExec as Query Executor
participant PartStream as Partition Stream (inner)
participant Monitor as PartitionedMetricMonitor
participant Merge as SelectAll / RecordBatchStreamAdapter
participant ExecPlan as ExecutionPlan
participant Metrics
Client->>QueryExec: execute(query, is_streaming)
alt streaming
QueryExec->>PartStream: execute_stream_partitioned(...)
QueryExec->>Monitor: PartitionedMetricMonitor::new(part_stream, plan, state)
QueryExec->>Merge: select_all(monitored_streams) -> RecordBatchStreamAdapter
QueryExec-->>Client: return Pin<Box<RecordBatchStreamAdapter<...>>>
Client->>Merge: poll_next()
Merge->>Monitor: poll_next() (per partition)
Monitor->>PartStream: poll_next()
PartStream-->>Monitor: RecordBatch / End / Error
Monitor-->>Merge: forward batch / end / error
alt final end or error
Monitor->>ExecPlan: get_total_bytes_scanned(plan)
ExecPlan-->>Monitor: total_bytes
Monitor->>Metrics: increment_bytes_scanned_in_query_by_date(date, total)
end
else non-streaming
QueryExec->>ExecPlan: collect_partitioned(...)
QueryExec->>ExecPlan: get_total_bytes_scanned(plan)
QueryExec->>Metrics: increment_bytes_scanned_in_query_by_date(date, total)
QueryExec-->>Client: return Vec<RecordBatch>
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/query/mod.rs (1)
806-835: Consider guarding against potential double metric increment.If
poll_nextis called after the stream returnsNone(which shouldn't happen with well-behaved consumers, but streams aren't required to be fused),get_total_bytes_scannedwould be called and metrics incremented again.Consider adding a
doneflag to track completion:pub struct MetricMonitorStream { inner: SendableRecordBatchStream, plan: Arc<dyn ExecutionPlan>, + metrics_recorded: bool, } impl MetricMonitorStream { pub fn new(inner: SendableRecordBatchStream, plan: Arc<dyn ExecutionPlan>) -> Self { - Self { inner, plan } + Self { inner, plan, metrics_recorded: false } } } impl Stream for MetricMonitorStream { type Item = datafusion::error::Result<RecordBatch>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let poll = self.inner.as_mut().poll_next(cx); match &poll { - Poll::Ready(None) => { + Poll::Ready(None) if !self.metrics_recorded => { + self.metrics_recorded = true; let bytes = get_total_bytes_scanned(&self.plan); // ... } - Poll::Ready(Some(Err(e))) => { + Poll::Ready(Some(Err(e))) if !self.metrics_recorded => { + self.metrics_recorded = true; // ... } _ => {} } poll } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/query/mod.rs(7 hunks)src/query/stream_schema_provider.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (8)
📓 Common learnings
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/azure_blob.rs:693-700
Timestamp: 2025-09-25T07:12:27.407Z
Learning: In the Parseable codebase, object store metrics (increment_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, etc.) should only be recorded for successful operations, not for failed attempts. The metric calls should be placed after error handling operators like `?` to ensure they only execute when operations succeed.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/s3.rs:837-844
Timestamp: 2025-09-25T07:13:58.909Z
Learning: In the Parseable codebase, the existing pattern of placing metrics calls after the `?` operator in S3 list_hours method is correct and should not be changed. The metrics are properly recorded only on successful operations.
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.
Applied to files:
src/query/stream_schema_provider.rssrc/query/mod.rs
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.
Applied to files:
src/query/stream_schema_provider.rssrc/query/mod.rs
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).
Applied to files:
src/query/stream_schema_provider.rs
📚 Learning: 2025-08-18T19:10:11.941Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1405
File: src/handlers/http/ingest.rs:163-164
Timestamp: 2025-08-18T19:10:11.941Z
Learning: Field statistics calculation in src/storage/field_stats.rs uses None for the time_partition parameter when calling flatten_and_push_logs(), as field stats generation does not require time partition functionality.
Applied to files:
src/query/stream_schema_provider.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.
Applied to files:
src/query/stream_schema_provider.rs
📚 Learning: 2025-10-20T17:48:53.444Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/handlers/http/cluster/mod.rs:1370-1400
Timestamp: 2025-10-20T17:48:53.444Z
Learning: In src/handlers/http/cluster/mod.rs, the billing metrics processing logic should NOT accumulate counter values from multiple Prometheus samples with the same labels. The intended behavior is to convert each received counter from nodes into individual events for ingestion, using `.insert()` to store the counter value directly.
Applied to files:
src/query/mod.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.
Applied to files:
src/query/mod.rs
🧬 Code graph analysis (2)
src/query/stream_schema_provider.rs (1)
src/metrics/mod.rs (1)
increment_files_scanned_in_query_by_date(557-561)
src/query/mod.rs (1)
src/metrics/mod.rs (1)
increment_bytes_scanned_in_query_by_date(563-567)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
🔇 Additional comments (3)
src/query/stream_schema_provider.rs (1)
59-59: LGTM! Clean separation of metrics responsibilities.The removal of
increment_bytes_scanned_in_query_by_datehere aligns with the PR objective to use ParquetExec'sbytes_scannedmetric instead of manual calculation. File count tracking remains appropriately inpartitioned_files().src/query/mod.rs (2)
210-235: LGTM! Clean metric tracking for both streaming and non-streaming paths.The implementation correctly:
- Creates the physical plan upfront
- For non-streaming: collects results, then reads
bytes_scannedfrom the plan- For streaming: wraps in
MetricMonitorStreamto defer metric collection until stream completion
326-342: LGTM! Correct recursive metric aggregation.The helper correctly traverses the entire execution plan tree and sums
bytes_scannedfrom all nodes. The metric key"bytes_scanned"is the standard used byParquetExec.
923fc8d to
0495e8a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/query/mod.rs(7 hunks)src/query/stream_schema_provider.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (8)
📓 Common learnings
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/azure_blob.rs:693-700
Timestamp: 2025-09-25T07:12:27.407Z
Learning: In the Parseable codebase, object store metrics (increment_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, etc.) should only be recorded for successful operations, not for failed attempts. The metric calls should be placed after error handling operators like `?` to ensure they only execute when operations succeed.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/s3.rs:837-844
Timestamp: 2025-09-25T07:13:58.909Z
Learning: In the Parseable codebase, the existing pattern of placing metrics calls after the `?` operator in S3 list_hours method is correct and should not be changed. The metrics are properly recorded only on successful operations.
📚 Learning: 2025-10-28T02:10:41.140Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1453
File: src/parseable/mod.rs:397-400
Timestamp: 2025-10-28T02:10:41.140Z
Learning: In Parseable enterprise deployments with multiple query nodes, hot tier configuration must be persisted in object storage so that newly started query nodes can fetch and synchronize the hot tier settings at startup (file: src/parseable/mod.rs, function: create_stream_and_schema_from_storage).
Applied to files:
src/query/stream_schema_provider.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.
Applied to files:
src/query/stream_schema_provider.rssrc/query/mod.rs
📚 Learning: 2025-08-18T19:10:11.941Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1405
File: src/handlers/http/ingest.rs:163-164
Timestamp: 2025-08-18T19:10:11.941Z
Learning: Field statistics calculation in src/storage/field_stats.rs uses None for the time_partition parameter when calling flatten_and_push_logs(), as field stats generation does not require time partition functionality.
Applied to files:
src/query/stream_schema_provider.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.
Applied to files:
src/query/stream_schema_provider.rs
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.
Applied to files:
src/query/stream_schema_provider.rssrc/query/mod.rs
📚 Learning: 2025-10-20T17:48:53.444Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/handlers/http/cluster/mod.rs:1370-1400
Timestamp: 2025-10-20T17:48:53.444Z
Learning: In src/handlers/http/cluster/mod.rs, the billing metrics processing logic should NOT accumulate counter values from multiple Prometheus samples with the same labels. The intended behavior is to convert each received counter from nodes into individual events for ingestion, using `.insert()` to store the counter value directly.
Applied to files:
src/query/mod.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.
Applied to files:
src/query/mod.rs
🧬 Code graph analysis (2)
src/query/stream_schema_provider.rs (1)
src/metrics/mod.rs (1)
increment_files_scanned_in_query_by_date(563-567)
src/query/mod.rs (1)
src/metrics/mod.rs (1)
increment_bytes_scanned_in_query_by_date(569-573)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: coverage
🔇 Additional comments (2)
src/query/stream_schema_provider.rs (2)
59-60: No action needed. The current design is correct—TOTAL_BYTES_SCANNED_IN_QUERY_BY_DATEis intentionally centralized insrc/query/mod.rsand emitted during execution (using ParquetExec's metrics), whilestream_schema_provider.rsonly emitsfiles_scannedat the plan-building stage. This separation is appropriate because plan-building cannot predict actual bytes read (which depends on runtime filtering and execution context). The removal of bytes-scanned metrics from the imports reflects a deliberate architectural fix to measure actual I/O instead of file-size estimates.
341-427: Metric emitted at plan construction, not execution: moveincrement_files_scanned_in_query_by_date()to execution-time.The metric is labeled "billing metrics for files scanned" but is currently emitted during query plan construction (line 424), not after files are actually read/scanned. Per existing patterns in the codebase, metrics should only be recorded when operations succeed. Consider moving this call to the execution layer (e.g., after
create_parquet_physical_plan()completes or in the execution plan itself) to ensure it counts only files that are actually processed.
| /// Recursively sums up "bytes_scanned" from all nodes in the plan | ||
| fn get_total_bytes_scanned(plan: &Arc<dyn ExecutionPlan>) -> u64 { | ||
| let mut total_bytes = 0; | ||
|
|
||
| if let Some(metrics) = plan.metrics() { | ||
| // "bytes_scanned" is the standard key used by ParquetExec | ||
| if let Some(scanned) = metrics.sum_by_name("bytes_scanned") { | ||
| total_bytes += scanned.as_usize() as u64; | ||
| } | ||
| } | ||
|
|
||
| for child in plan.children() { | ||
| total_bytes += get_total_bytes_scanned(child); | ||
| } | ||
|
|
||
| total_bytes | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🌐 Web query:
DataFusion 51.0.0 ParquetExec metrics "bytes_scanned" metric name and sum_by_name return type
💡 Result:
Do you mean the metric named "bytes_scanned" added in DataFusion 51.0.0 (yes — name is bytes_scanned)? Which API do you mean by sum_by_name (the Rust function/crate or the CLI/EXPLAIN display)? I can look up the exact return type once you confirm which sum_by_name you mean. Source: DataFusion 51.0.0 release notes. [1]
🏁 Script executed:
# Check if DataFusion is a dependency and what version
cd /root
fd Cargo.toml -t f | head -20 | xargs grep -l "datafusion" 2>/dev/null | head -5Repository: parseablehq/parseable
Length of output: 148
🏁 Script executed:
# Search for metric name usage patterns in the codebase
rg "bytes_scanned|metrics\.sum_by_name" -t rust -A 2 -B 2Repository: parseablehq/parseable
Length of output: 9319
🏁 Script executed:
# Search for ExecutionPlan metrics interface usage
rg "\.metrics\(\)" -t rust -A 3 | head -50Repository: parseablehq/parseable
Length of output: 367
🌐 Web query:
DataFusion metrics.sum_by_name return type ScalarValue u64
💡 Result:
DataFusion’s metrics API (metrics.sum_by_name) returns a ScalarValue (a dynamically-typed nullable value). For a metric that is a sum it is represented as a u64 inside a ScalarValue::UInt64 (i.e. logically a u64). To get the raw u64 you must match/cast the ScalarValue.
Example:
- simple match
use datafusion_common::ScalarValue;
let sv: ScalarValue = metrics.sum_by_name("my_sum")?; // example
let value_u64 = match sv {
ScalarValue::UInt64(Some(v)) => v,
ScalarValue::UInt64(None) => panic!("metric is NULL"),
_ => panic!("unexpected ScalarValue type"),
};
- or using try_into with pattern (requires std::convert::TryInto or helper methods):
if let ScalarValue::UInt64(Some(v)) = sv { /* use v */ }
References: DataFusion ScalarValue docs and Metrics user guide. [1][2]
Sources
[1] ScalarValue enum (datafusion docs).
[2] DataFusion metrics user guide.
🌐 Web query:
DataFusion 51.0.0 ExecutionPlan metrics API u64 accessor bytes_scanned
💡 Result:
Short answer — call ExecutionPlan::metrics(), find the metric named "bytes_scanned" and read its MetricValue::UInt64. Example (DataFusion 51.0.0):
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::metrics::MetricValue;
fn bytes_scanned_from_plan(plan: &dyn ExecutionPlan) -> Option<u64> {
let metrics_set = plan.metrics()?; // Option<MetricsSet>
// MetricsSet exposes an iterator/search over metric entries (name -> MetricValue)
for metric in metrics_set.iter() {
if metric.name() == "bytes_scanned" {
match metric.value() {
MetricValue::UInt64(v) => return Some(*v),
MetricValue::UInt64ToString(v) => return v.parse().ok(), // if present as string
_ => return None,
}
}
}
None
}Notes:
- ExecutionPlan::metrics() is the API used to get per-operator metrics (EXPLAIN ANALYZE shows bytes_scanned). [1]
- MetricValue variants include a UInt64 variant; read that to get the u64. See the physical-plan metrics types in the crate docs. [2]
Sources:
[1] DataFusion — Reading Explain Plans / EXPLAIN ANALYZE (shows bytes_scanned metric).
[2] datafusion crate docs — physical_plan::metrics (MetricValue / metrics API).
🏁 Script executed:
# Search for DataFusion metrics usage in the codebase to understand return type
rg "sum_by_name|metrics\(\)" -t rust -B 2 -A 5 | grep -A 5 -B 2 "sum_by_name"Repository: parseablehq/parseable
Length of output: 487
🏁 Script executed:
# Check Cargo.toml for DataFusion version used
cat Cargo.toml | grep -i datafusionRepository: parseablehq/parseable
Length of output: 119
🌐 Web query:
DataFusion ScalarValue as_usize() method truncation 32-bit u64
💡 Result:
Short answer: yes — when DataFusion (or your code) converts a ScalarValue::UInt64 to usize it uses a plain cast to usize which will truncate on 32‑bit targets. This is a consequence of Rust's as casting semantics (it silently truncates when converting a larger integer to a smaller one) and the fact that usize size is target‑dependent. Use a fallible conversion (e.g. usize::try_from(u64) / TryFrom / TryInto) or explicitly check value <= usize::MAX before converting to avoid silent truncation. (docs.rs)
Suggested pattern:
- check or try-convert:
let n = u64_val.try_into().map_err(|_| ...)?;(orusize::try_from(u64_val))- or
if u64_val <= usize::MAX as u64 { let n = u64_val as usize; } else { /* handle overflow */ }
References:
- DataFusion ScalarValue docs. (docs.rs)
- Rust integer-cast rules / explanation of
asvsFrom/TryFrom. (stackoverflow.com) - Example discussion in DataFusion PR/comments showing use of
.as_usize()and related concerns. (mail-archive.com)
Citations:
- 1: https://docs.rs/datafusion/latest/datafusion/scalar/enum.ScalarValue.html?utm_source=openai
- 2: https://docs.rs/datafusion/latest/datafusion/scalar/enum.ScalarValue.html?utm_source=openai
- 3: https://stackoverflow.com/questions/47786322/why-is-type-conversion-from-u64-to-usize-allowed-using-as-but-not-from?utm_source=openai
- 4: https://www.mail-archive.com/github%40datafusion.apache.org/msg58016.html?utm_source=openai
🏁 Script executed:
# Search for ScalarValue usage patterns in this codebase to see if there's a safer pattern
rg "ScalarValue" -t rust -A 3 -B 1Repository: parseablehq/parseable
Length of output: 10510
🏁 Script executed:
# Check if there's any direct u64 extraction in the codebase for metrics
rg "metric|sum_by_name" -t rust -C 3Repository: parseablehq/parseable
Length of output: 50378
Unsafe truncation when summing bytes scanned — prefer direct u64 extraction.
metrics.sum_by_name() returns a ScalarValue (not raw u64), and calling as_usize() silently truncates on 32-bit systems. Instead of scanned.as_usize() as u64, match the ScalarValue::UInt64 variant directly:
if let Some(ScalarValue::UInt64(Some(v))) = metrics.sum_by_name("bytes_scanned") {
total_bytes += v;
}The metric name "bytes_scanned" is confirmed as stable in DataFusion 51.0.0.
🤖 Prompt for AI Agents
In src/query/mod.rs around lines 326 to 342, the code uses
metrics.sum_by_name("bytes_scanned") and converts the returned ScalarValue via
scanned.as_usize() as u64 which can silently truncate on 32-bit targets; replace
the cast with a direct pattern match on the ScalarValue::UInt64(Some(v)) (or the
fully-qualified ScalarValue path used in the project) and add v to total_bytes
only in that branch so you preserve the full u64 metric without truncation.
Use ParquetExec's `bytes_scanned` metric instead of scanning manually Implemented the same in case of streaming by wrapping our execution plan with a monitor
d93aa2e to
78a918c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/query/mod.rs (1)
388-404: The unsafe truncation issue from the previous review was not addressed.Line 395 still uses
scanned.as_usize() as u64, which will silently truncate on 32-bit systems. The previous review comment (lines 388-404) explicitly requested direct pattern matching on theScalarValue::UInt64variant.Apply the recommended fix from the previous review:
if let Some(metrics) = plan.metrics() { // "bytes_scanned" is the standard key used by ParquetExec - if let Some(scanned) = metrics.sum_by_name("bytes_scanned") { - total_bytes += scanned.as_usize() as u64; - } + if let Some(datafusion::common::ScalarValue::UInt64(Some(v))) = + metrics.sum_by_name("bytes_scanned") + { + total_bytes += v; + } }
🧹 Nitpick comments (1)
src/query/mod.rs (1)
88-117: Simplify the public API surface by abstracting the return type.The return type exposes deeply nested implementation details (
Pin<Box<RecordBatchStreamAdapter<select_all::SelectAll<...>>>>), making the API brittle and difficult to evolve. Consider using a type alias or returning a trait object:// Option 1: Type alias type QueryResultStream = Pin<Box<dyn RecordBatchStream + Send>>; pub async fn execute( query: Query, is_streaming: bool, ) -> Result<(Either<Vec<RecordBatch>, QueryResultStream>, Vec<String>), ExecuteError>Or simplify further by boxing the complex stream:
// In the streaming path (line 297) Either::Right(Box::pin(final_stream) as Pin<Box<dyn RecordBatchStream + Send>>)This prevents implementation details from leaking into the public API.
Also applies to: 209-237
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/query/mod.rs(7 hunks)src/query/stream_schema_provider.rs(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/query/stream_schema_provider.rs
🧰 Additional context used
🧠 Learnings (8)
📓 Common learnings
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/s3.rs:837-844
Timestamp: 2025-09-25T07:13:58.909Z
Learning: In the Parseable codebase, the existing pattern of placing metrics calls after the `?` operator in S3 list_hours method is correct and should not be changed. The metrics are properly recorded only on successful operations.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/azure_blob.rs:693-700
Timestamp: 2025-09-25T07:12:27.407Z
Learning: In the Parseable codebase, object store metrics (increment_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, etc.) should only be recorded for successful operations, not for failed attempts. The metric calls should be placed after error handling operators like `?` to ensure they only execute when operations succeed.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/gcs.rs:625-633
Timestamp: 2025-09-25T07:12:40.189Z
Learning: In Parseable's object storage metrics system, metrics should only be captured when API calls succeed, not when they error out. The current pattern of calling increment_object_store_calls_by_date and related metrics functions after the `?` operator is the correct approach.
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/gcs.rs:674-681
Timestamp: 2025-09-25T07:13:04.112Z
Learning: In the Parseable codebase, object store metrics should only be captured for successful operations. Metric recording calls should be placed after `await?` operators to ensure they only execute when operations succeed, not when they fail.
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.
Applied to files:
src/query/mod.rs
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.
Applied to files:
src/query/mod.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.
Applied to files:
src/query/mod.rs
📚 Learning: 2025-09-18T09:59:20.177Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:700-756
Timestamp: 2025-09-18T09:59:20.177Z
Learning: In src/event/mod.rs, the parsed_timestamp used in increment_events_ingested_by_date() is correctly UTC-normalized: for dynamic streams it remains Utc::now(), and for streams with time partition enabled it uses the time partition value. Both cases result in proper UTC date strings for metrics labeling, preventing double-counting issues.
Applied to files:
src/query/mod.rs
📚 Learning: 2025-09-25T07:12:27.407Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/azure_blob.rs:693-700
Timestamp: 2025-09-25T07:12:27.407Z
Learning: In the Parseable codebase, object store metrics (increment_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, etc.) should only be recorded for successful operations, not for failed attempts. The metric calls should be placed after error handling operators like `?` to ensure they only execute when operations succeed.
Applied to files:
src/query/mod.rs
📚 Learning: 2025-10-20T17:48:53.444Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1448
File: src/handlers/http/cluster/mod.rs:1370-1400
Timestamp: 2025-10-20T17:48:53.444Z
Learning: In src/handlers/http/cluster/mod.rs, the billing metrics processing logic should NOT accumulate counter values from multiple Prometheus samples with the same labels. The intended behavior is to convert each received counter from nodes into individual events for ingestion, using `.insert()` to store the counter value directly.
Applied to files:
src/query/mod.rs
📚 Learning: 2025-09-25T07:13:04.112Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1441
File: src/storage/gcs.rs:674-681
Timestamp: 2025-09-25T07:13:04.112Z
Learning: In the Parseable codebase, object store metrics should only be captured for successful operations. Metric recording calls should be placed after `await?` operators to ensure they only execute when operations succeed, not when they fail.
Applied to files:
src/query/mod.rs
🧬 Code graph analysis (1)
src/query/mod.rs (1)
src/metrics/mod.rs (1)
increment_bytes_scanned_in_query_by_date(569-573)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
🔇 Additional comments (3)
src/query/mod.rs (3)
255-298: Excellent fix for multi-partition execution!The code now correctly handles all partitions:
- Non-streaming:
collect_partitionedmaterializes all partitions and flattens them- Streaming:
execute_stream_partitionedwraps each partition with metrics monitoringThis addresses the critical data loss issue flagged in the previous review where only partition 0 was being executed.
854-858: Clean coordination design for multi-partition metrics.The
MonitorStatestruct effectively coordinates metric emission across partitions usingAtomicUsize. When the last partition completes, metrics are recorded exactly once.
860-928: Correct implementation addresses all previous review concerns.The
PartitionedMetricMonitorimplementation properly addresses the issues from the previous review:
- Emit-once guard: The
is_finishedflag (line 867) prevents re-emission if polled after completion/error.- Atomic coordination:
fetch_subwithSeqCstordering (line 920) ensures exactly one partition emits the aggregated metrics.- Error handling: Errors trigger metric emission before propagation (lines 896-900).
The implementation correctly handles concurrent partition completion.
Use ParquetExec's
bytes_scannedmetric instead of scanning manuallyImplemented the same in case of streaming by wrapping our execution plan with a monitor
Fixes #XXXX.
Description
This PR has:
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.