Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 191 additions & 5 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,38 @@ mod listing_table_builder;
pub mod stream_schema_provider;

use actix_web::Either;
use arrow_schema::SchemaRef;
use chrono::NaiveDateTime;
use chrono::{DateTime, Duration, Utc};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::tree_node::Transformed;
use datafusion::execution::disk_manager::DiskManager;
use datafusion::execution::{SendableRecordBatchStream, SessionState, SessionStateBuilder};
use datafusion::execution::{
RecordBatchStream, SendableRecordBatchStream, SessionState, SessionStateBuilder,
};
use datafusion::logical_expr::expr::Alias;
use datafusion::logical_expr::{
Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan,
};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
ExecutionPlan, ExecutionPlanProperties, collect_partitioned, execute_stream_partitioned,
};
use datafusion::prelude::*;
use datafusion::sql::parser::DFParser;
use datafusion::sql::resolve::resolve_table_references;
use datafusion::sql::sqlparser::dialect::PostgreSqlDialect;
use futures::Stream;
use futures::stream::select_all;
use itertools::Itertools;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::ops::Bound;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use sysinfo::System;
use tokio::runtime::Runtime;

Expand All @@ -55,6 +67,7 @@ use crate::catalog::manifest::Manifest;
use crate::catalog::snapshot::Snapshot;
use crate::event::DEFAULT_TIMESTAMP_KEY;
use crate::handlers::http::query::QueryError;
use crate::metrics::increment_bytes_scanned_in_query_by_date;
use crate::option::Mode;
use crate::parseable::PARSEABLE;
use crate::storage::{ObjectStorageProvider, ObjectStoreFormat};
Expand All @@ -77,7 +90,27 @@ pub async fn execute(
is_streaming: bool,
) -> Result<
(
Either<Vec<RecordBatch>, SendableRecordBatchStream>,
Either<
Vec<RecordBatch>,
Pin<
Box<
RecordBatchStreamAdapter<
select_all::SelectAll<
Pin<
Box<
dyn RecordBatchStream<
Item = Result<
RecordBatch,
datafusion::error::DataFusionError,
>,
> + Send,
>,
>,
>,
>,
>,
>,
>,
Vec<String>,
),
ExecuteError,
Expand Down Expand Up @@ -178,7 +211,27 @@ impl Query {
is_streaming: bool,
) -> Result<
(
Either<Vec<RecordBatch>, SendableRecordBatchStream>,
Either<
Vec<RecordBatch>,
Pin<
Box<
RecordBatchStreamAdapter<
select_all::SelectAll<
Pin<
Box<
dyn RecordBatchStream<
Item = Result<
RecordBatch,
datafusion::error::DataFusionError,
>,
> + Send,
>,
>,
>,
>,
>,
>,
>,
Vec<String>,
),
ExecuteError,
Expand All @@ -199,10 +252,49 @@ impl Query {
return Ok((Either::Left(vec![]), fields));
}

let plan = QUERY_SESSION
.state()
.create_physical_plan(df.logical_plan())
.await?;

let results = if !is_streaming {
Either::Left(df.collect().await?)
let task_ctx = QUERY_SESSION.task_ctx();

let batches = collect_partitioned(plan.clone(), task_ctx.clone())
.await?
.into_iter()
.flatten()
.collect();

let actual_io_bytes = get_total_bytes_scanned(&plan);

// Track billing metrics for query scan
let current_date = chrono::Utc::now().date_naive().to_string();
increment_bytes_scanned_in_query_by_date(actual_io_bytes, &current_date);

Either::Left(batches)
} else {
Either::Right(df.execute_stream().await?)
let task_ctx = QUERY_SESSION.task_ctx();

let output_partitions = plan.output_partitioning().partition_count();

let monitor_state = Arc::new(MonitorState {
plan: plan.clone(),
active_streams: AtomicUsize::new(output_partitions),
});

let streams = execute_stream_partitioned(plan.clone(), task_ctx.clone())?
.into_iter()
.map(|s| {
let wrapped = PartitionedMetricMonitor::new(s, monitor_state.clone());
Box::pin(wrapped) as SendableRecordBatchStream
})
.collect_vec();

let merged_stream = futures::stream::select_all(streams);

let final_stream = RecordBatchStreamAdapter::new(plan.schema(), merged_stream);
Either::Right(Box::pin(final_stream))
};

Ok((results, fields))
Expand Down Expand Up @@ -293,6 +385,24 @@ impl Query {
}
}

/// Recursively sums up "bytes_scanned" from all nodes in the plan
fn get_total_bytes_scanned(plan: &Arc<dyn ExecutionPlan>) -> u64 {
let mut total_bytes = 0;

if let Some(metrics) = plan.metrics() {
// "bytes_scanned" is the standard key used by ParquetExec
if let Some(scanned) = metrics.sum_by_name("bytes_scanned") {
total_bytes += scanned.as_usize() as u64;
}
}

for child in plan.children() {
total_bytes += get_total_bytes_scanned(child);
}

total_bytes
}
Comment on lines +388 to +404
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

DataFusion 51.0.0 ParquetExec metrics "bytes_scanned" metric name and sum_by_name return type

💡 Result:

Do you mean the metric named "bytes_scanned" added in DataFusion 51.0.0 (yes — name is bytes_scanned)? Which API do you mean by sum_by_name (the Rust function/crate or the CLI/EXPLAIN display)? I can look up the exact return type once you confirm which sum_by_name you mean. Source: DataFusion 51.0.0 release notes. [1]


🏁 Script executed:

# Check if DataFusion is a dependency and what version
cd /root
fd Cargo.toml -t f | head -20 | xargs grep -l "datafusion" 2>/dev/null | head -5

Repository: parseablehq/parseable

Length of output: 148


🏁 Script executed:

# Search for metric name usage patterns in the codebase
rg "bytes_scanned|metrics\.sum_by_name" -t rust -A 2 -B 2

Repository: parseablehq/parseable

Length of output: 9319


🏁 Script executed:

# Search for ExecutionPlan metrics interface usage
rg "\.metrics\(\)" -t rust -A 3 | head -50

Repository: parseablehq/parseable

Length of output: 367


🌐 Web query:

DataFusion metrics.sum_by_name return type ScalarValue u64

💡 Result:

DataFusion’s metrics API (metrics.sum_by_name) returns a ScalarValue (a dynamically-typed nullable value). For a metric that is a sum it is represented as a u64 inside a ScalarValue::UInt64 (i.e. logically a u64). To get the raw u64 you must match/cast the ScalarValue.

Example:

  • simple match
use datafusion_common::ScalarValue;

let sv: ScalarValue = metrics.sum_by_name("my_sum")?; // example
let value_u64 = match sv {
    ScalarValue::UInt64(Some(v)) => v,
    ScalarValue::UInt64(None) => panic!("metric is NULL"),
    _ => panic!("unexpected ScalarValue type"),
};
  • or using try_into with pattern (requires std::convert::TryInto or helper methods):
if let ScalarValue::UInt64(Some(v)) = sv { /* use v */ }

References: DataFusion ScalarValue docs and Metrics user guide. [1][2]

Sources
[1] ScalarValue enum (datafusion docs).
[2] DataFusion metrics user guide.


🌐 Web query:

DataFusion 51.0.0 ExecutionPlan metrics API u64 accessor bytes_scanned

💡 Result:

Short answer — call ExecutionPlan::metrics(), find the metric named "bytes_scanned" and read its MetricValue::UInt64. Example (DataFusion 51.0.0):

use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::metrics::MetricValue;

fn bytes_scanned_from_plan(plan: &dyn ExecutionPlan) -> Option<u64> {
    let metrics_set = plan.metrics()?; // Option<MetricsSet>
    // MetricsSet exposes an iterator/search over metric entries (name -> MetricValue)
    for metric in metrics_set.iter() {
        if metric.name() == "bytes_scanned" {
            match metric.value() {
                MetricValue::UInt64(v) => return Some(*v),
                MetricValue::UInt64ToString(v) => return v.parse().ok(), // if present as string
                _ => return None,
            }
        }
    }
    None
}

Notes:

  • ExecutionPlan::metrics() is the API used to get per-operator metrics (EXPLAIN ANALYZE shows bytes_scanned). [1]
  • MetricValue variants include a UInt64 variant; read that to get the u64. See the physical-plan metrics types in the crate docs. [2]

Sources:
[1] DataFusion — Reading Explain Plans / EXPLAIN ANALYZE (shows bytes_scanned metric).
[2] datafusion crate docs — physical_plan::metrics (MetricValue / metrics API).


🏁 Script executed:

# Search for DataFusion metrics usage in the codebase to understand return type
rg "sum_by_name|metrics\(\)" -t rust -B 2 -A 5 | grep -A 5 -B 2 "sum_by_name"

Repository: parseablehq/parseable

Length of output: 487


🏁 Script executed:

# Check Cargo.toml for DataFusion version used
cat Cargo.toml | grep -i datafusion

Repository: parseablehq/parseable

Length of output: 119


🌐 Web query:

DataFusion ScalarValue as_usize() method truncation 32-bit u64

💡 Result:

Short answer: yes — when DataFusion (or your code) converts a ScalarValue::UInt64 to usize it uses a plain cast to usize which will truncate on 32‑bit targets. This is a consequence of Rust's as casting semantics (it silently truncates when converting a larger integer to a smaller one) and the fact that usize size is target‑dependent. Use a fallible conversion (e.g. usize::try_from(u64) / TryFrom / TryInto) or explicitly check value <= usize::MAX before converting to avoid silent truncation. (docs.rs)

Suggested pattern:

  • check or try-convert:
    • let n = u64_val.try_into().map_err(|_| ...)?; (or usize::try_from(u64_val))
    • or if u64_val <= usize::MAX as u64 { let n = u64_val as usize; } else { /* handle overflow */ }

References:

  • DataFusion ScalarValue docs. (docs.rs)
  • Rust integer-cast rules / explanation of as vs From / TryFrom. (stackoverflow.com)
  • Example discussion in DataFusion PR/comments showing use of .as_usize() and related concerns. (mail-archive.com)

Citations:


🏁 Script executed:

# Search for ScalarValue usage patterns in this codebase to see if there's a safer pattern
rg "ScalarValue" -t rust -A 3 -B 1

Repository: parseablehq/parseable

Length of output: 10510


🏁 Script executed:

# Check if there's any direct u64 extraction in the codebase for metrics
rg "metric|sum_by_name" -t rust -C 3

Repository: parseablehq/parseable

Length of output: 50378


Unsafe truncation when summing bytes scanned — prefer direct u64 extraction.

metrics.sum_by_name() returns a ScalarValue (not raw u64), and calling as_usize() silently truncates on 32-bit systems. Instead of scanned.as_usize() as u64, match the ScalarValue::UInt64 variant directly:

if let Some(ScalarValue::UInt64(Some(v))) = metrics.sum_by_name("bytes_scanned") {
    total_bytes += v;
}

The metric name "bytes_scanned" is confirmed as stable in DataFusion 51.0.0.

🤖 Prompt for AI Agents
In src/query/mod.rs around lines 326 to 342, the code uses
metrics.sum_by_name("bytes_scanned") and converts the returned ScalarValue via
scanned.as_usize() as u64 which can silently truncate on 32-bit targets; replace
the cast with a direct pattern match on the ScalarValue::UInt64(Some(v)) (or the
fully-qualified ScalarValue path used in the project) and add v to total_bytes
only in that branch so you preserve the full u64 metric without truncation.


/// Record of counts for a given time bin.
#[derive(Debug, Serialize, Clone, Deserialize)]
pub struct CountsRecord {
Expand Down Expand Up @@ -741,6 +851,82 @@ pub mod error {
}
}

/// Shared state across all partitions
struct MonitorState {
plan: Arc<dyn ExecutionPlan>,
active_streams: AtomicUsize,
}

/// A wrapper that monitors the ExecutionPlan and logs metrics when the stream finishes.
pub struct PartitionedMetricMonitor {
// The actual stream doing the work
inner: SendableRecordBatchStream,
/// State of the streams
state: Arc<MonitorState>,
// Ensure we only emit metrics once even if polled after completion/error
is_finished: bool,
}

impl PartitionedMetricMonitor {
fn new(inner: SendableRecordBatchStream, state: Arc<MonitorState>) -> Self {
Self {
inner,
state,
is_finished: false,
}
}
}

impl Stream for PartitionedMetricMonitor {
type Item = datafusion::error::Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.is_finished {
return Poll::Ready(None);
}

let poll = self.inner.as_mut().poll_next(cx);

// Check if the stream just finished
match &poll {
Poll::Ready(None) => {
self.is_finished = true;
self.check_if_last_stream();
}
Poll::Ready(Some(Err(e))) => {
tracing::error!("Stream Failed with error: {}", e);
self.is_finished = true;
self.check_if_last_stream();
}
_ => {}
}

poll
}

fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}

impl RecordBatchStream for PartitionedMetricMonitor {
fn schema(&self) -> SchemaRef {
self.inner.schema()
}
}

impl PartitionedMetricMonitor {
fn check_if_last_stream(&self) {
let prev_count = self.state.active_streams.fetch_sub(1, Ordering::SeqCst);

if prev_count == 1 {
let bytes = get_total_bytes_scanned(&self.state.plan);
let current_date = chrono::Utc::now().date_naive().to_string();
increment_bytes_scanned_in_query_by_date(bytes, &current_date);
}
}
}

#[cfg(test)]
mod tests {
use serde_json::json;
Expand Down
11 changes: 1 addition & 10 deletions src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ use crate::{
},
event::DEFAULT_TIMESTAMP_KEY,
hottier::HotTierManager,
metrics::{
QUERY_CACHE_HIT, increment_bytes_scanned_in_query_by_date,
increment_files_scanned_in_query_by_date,
},
metrics::{QUERY_CACHE_HIT, increment_files_scanned_in_query_by_date},
option::Mode,
parseable::{PARSEABLE, STREAM_EXISTS},
storage::{ObjectStorage, ObjectStoreFormat},
Expand Down Expand Up @@ -349,7 +346,6 @@ impl StandardTableProvider {
let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new()));
let mut column_statistics = HashMap::<String, Option<TypedStatistics>>::new();
let mut count = 0;
let mut total_compressed_size = 0u64;
let mut file_count = 0u64;
for (index, file) in manifest_files
.into_iter()
Expand All @@ -366,9 +362,6 @@ impl StandardTableProvider {

// Track billing metrics for files scanned in query
file_count += 1;
// Calculate actual compressed bytes that will be read from storage
let compressed_bytes: u64 = columns.iter().map(|col| col.compressed_size).sum();
total_compressed_size += compressed_bytes;

// object_store::path::Path doesn't automatically deal with Windows path separators
// to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem
Expand Down Expand Up @@ -429,8 +422,6 @@ impl StandardTableProvider {
// Track billing metrics for query scan
let current_date = chrono::Utc::now().date_naive().to_string();
increment_files_scanned_in_query_by_date(file_count, &current_date);
// Use compressed size as it represents actual bytes read from storage (S3/object store charges)
increment_bytes_scanned_in_query_by_date(total_compressed_size, &current_date);

(partitioned_files, statistics)
}
Expand Down
Loading