From 4687724796a686747949ec53406abcb11e3c284e Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Sun, 14 Dec 2025 13:03:20 +0530 Subject: [PATCH 1/2] fix query scan metrics Use ParquetExec's `bytes_scanned` metric instead of scanning manually Implemented the same in case of streaming by wrapping our execution plan with a monitor --- src/query/mod.rs | 109 ++++++++++++++++++++++++++-- src/query/stream_schema_provider.rs | 11 +-- 2 files changed, 105 insertions(+), 15 deletions(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index 767bf54c1..e397da96f 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -21,26 +21,33 @@ mod listing_table_builder; pub mod stream_schema_provider; use actix_web::Either; +use arrow_schema::SchemaRef; use chrono::NaiveDateTime; use chrono::{DateTime, Duration, Utc}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::tree_node::Transformed; use datafusion::execution::disk_manager::DiskManager; -use datafusion::execution::{SendableRecordBatchStream, SessionState, SessionStateBuilder}; +use datafusion::execution::{ + RecordBatchStream, SendableRecordBatchStream, SessionState, SessionStateBuilder, +}; use datafusion::logical_expr::expr::Alias; use datafusion::logical_expr::{ Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan, }; +use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; use datafusion::sql::parser::DFParser; use datafusion::sql::resolve::resolve_table_references; use datafusion::sql::sqlparser::dialect::PostgreSqlDialect; +use futures::Stream; use itertools::Itertools; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use std::ops::Bound; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use sysinfo::System; use tokio::runtime::Runtime; @@ -55,6 +62,7 @@ use crate::catalog::manifest::Manifest; use crate::catalog::snapshot::Snapshot; use crate::event::DEFAULT_TIMESTAMP_KEY; use crate::handlers::http::query::QueryError; +use crate::metrics::increment_bytes_scanned_in_query_by_date; use crate::option::Mode; use crate::parseable::PARSEABLE; use crate::storage::{ObjectStorageProvider, ObjectStoreFormat}; @@ -77,7 +85,7 @@ pub async fn execute( is_streaming: bool, ) -> Result< ( - Either, SendableRecordBatchStream>, + Either, Pin>>, Vec, ), ExecuteError, @@ -178,7 +186,7 @@ impl Query { is_streaming: bool, ) -> Result< ( - Either, SendableRecordBatchStream>, + Either, Pin>>, Vec, ), ExecuteError, @@ -199,10 +207,32 @@ impl Query { return Ok((Either::Left(vec![]), fields)); } + let plan = QUERY_SESSION + .state() + .create_physical_plan(df.logical_plan()) + .await?; + let results = if !is_streaming { - Either::Left(df.collect().await?) + let task_ctx = QUERY_SESSION.task_ctx(); + + let stream = plan.execute(0, task_ctx)?; + let batches = datafusion::physical_plan::common::collect(stream).await?; + + let actual_io_bytes = get_total_bytes_scanned(&plan); + + // Track billing metrics for query scan + let current_date = chrono::Utc::now().date_naive().to_string(); + increment_bytes_scanned_in_query_by_date(actual_io_bytes, ¤t_date); + + Either::Left(batches) } else { - Either::Right(df.execute_stream().await?) + let task_ctx = QUERY_SESSION.task_ctx(); + + let stream = plan.execute(0, task_ctx)?; + + let monitored_stream = MetricMonitorStream::new(stream, plan.clone()); + + Either::Right(Box::pin(monitored_stream)) }; Ok((results, fields)) @@ -293,6 +323,24 @@ impl Query { } } +/// Recursively sums up "bytes_scanned" from all nodes in the plan +fn get_total_bytes_scanned(plan: &Arc) -> u64 { + let mut total_bytes = 0; + + if let Some(metrics) = plan.metrics() { + // "bytes_scanned" is the standard key used by ParquetExec + if let Some(scanned) = metrics.sum_by_name("bytes_scanned") { + total_bytes += scanned.as_usize() as u64; + } + } + + for child in plan.children() { + total_bytes += get_total_bytes_scanned(child); + } + + total_bytes +} + /// Record of counts for a given time bin. #[derive(Debug, Serialize, Clone, Deserialize)] pub struct CountsRecord { @@ -741,6 +789,57 @@ pub mod error { } } +/// A wrapper that monitors the ExecutionPlan and logs metrics when the stream finishes. +pub struct MetricMonitorStream { + // The actual stream doing the work + inner: SendableRecordBatchStream, + // We hold the plan so we can read metrics after execution + plan: Arc, +} + +impl MetricMonitorStream { + pub fn new(inner: SendableRecordBatchStream, plan: Arc) -> Self { + Self { inner, plan } + } +} + +impl Stream for MetricMonitorStream { + type Item = datafusion::error::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let poll = self.inner.as_mut().poll_next(cx); + + // Check if the stream just finished + match &poll { + Poll::Ready(None) => { + // Stream is done. Now we can safely read the metrics. + let bytes = get_total_bytes_scanned(&self.plan); + let current_date = chrono::Utc::now().date_naive().to_string(); + increment_bytes_scanned_in_query_by_date(bytes, ¤t_date); + } + Poll::Ready(Some(Err(e))) => { + let bytes = get_total_bytes_scanned(&self.plan); + let current_date = chrono::Utc::now().date_naive().to_string(); + increment_bytes_scanned_in_query_by_date(bytes, ¤t_date); + tracing::error!("Stream Failed with error: {}", e); + } + _ => {} + } + + poll + } + + fn size_hint(&self) -> (usize, Option) { + (0, None) + } +} + +impl RecordBatchStream for MetricMonitorStream { + fn schema(&self) -> SchemaRef { + self.inner.schema() + } +} + #[cfg(test)] mod tests { use serde_json::json; diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 9bc7bbb35..93ac7911a 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -56,10 +56,7 @@ use crate::{ }, event::DEFAULT_TIMESTAMP_KEY, hottier::HotTierManager, - metrics::{ - QUERY_CACHE_HIT, increment_bytes_scanned_in_query_by_date, - increment_files_scanned_in_query_by_date, - }, + metrics::{QUERY_CACHE_HIT, increment_files_scanned_in_query_by_date}, option::Mode, parseable::{PARSEABLE, STREAM_EXISTS}, storage::{ObjectStorage, ObjectStoreFormat}, @@ -349,7 +346,6 @@ impl StandardTableProvider { let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); let mut column_statistics = HashMap::>::new(); let mut count = 0; - let mut total_compressed_size = 0u64; let mut file_count = 0u64; for (index, file) in manifest_files .into_iter() @@ -366,9 +362,6 @@ impl StandardTableProvider { // Track billing metrics for files scanned in query file_count += 1; - // Calculate actual compressed bytes that will be read from storage - let compressed_bytes: u64 = columns.iter().map(|col| col.compressed_size).sum(); - total_compressed_size += compressed_bytes; // object_store::path::Path doesn't automatically deal with Windows path separators // to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem @@ -429,8 +422,6 @@ impl StandardTableProvider { // Track billing metrics for query scan let current_date = chrono::Utc::now().date_naive().to_string(); increment_files_scanned_in_query_by_date(file_count, ¤t_date); - // Use compressed size as it represents actual bytes read from storage (S3/object store charges) - increment_bytes_scanned_in_query_by_date(total_compressed_size, ¤t_date); (partitioned_files, statistics) } From 78a918c4ff365800f64a12f6d3274a310a5bc630 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Tue, 16 Dec 2025 18:22:00 +0530 Subject: [PATCH 2/2] bugfix: execution of all partitions --- src/query/mod.rs | 133 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 110 insertions(+), 23 deletions(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index e397da96f..89eef3bce 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -34,12 +34,16 @@ use datafusion::logical_expr::expr::Alias; use datafusion::logical_expr::{ Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan, }; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + ExecutionPlan, ExecutionPlanProperties, collect_partitioned, execute_stream_partitioned, +}; use datafusion::prelude::*; use datafusion::sql::parser::DFParser; use datafusion::sql::resolve::resolve_table_references; use datafusion::sql::sqlparser::dialect::PostgreSqlDialect; use futures::Stream; +use futures::stream::select_all; use itertools::Itertools; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; @@ -47,6 +51,7 @@ use serde_json::{Value, json}; use std::ops::Bound; use std::pin::Pin; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; use sysinfo::System; use tokio::runtime::Runtime; @@ -85,7 +90,27 @@ pub async fn execute( is_streaming: bool, ) -> Result< ( - Either, Pin>>, + Either< + Vec, + Pin< + Box< + RecordBatchStreamAdapter< + select_all::SelectAll< + Pin< + Box< + dyn RecordBatchStream< + Item = Result< + RecordBatch, + datafusion::error::DataFusionError, + >, + > + Send, + >, + >, + >, + >, + >, + >, + >, Vec, ), ExecuteError, @@ -186,7 +211,27 @@ impl Query { is_streaming: bool, ) -> Result< ( - Either, Pin>>, + Either< + Vec, + Pin< + Box< + RecordBatchStreamAdapter< + select_all::SelectAll< + Pin< + Box< + dyn RecordBatchStream< + Item = Result< + RecordBatch, + datafusion::error::DataFusionError, + >, + > + Send, + >, + >, + >, + >, + >, + >, + >, Vec, ), ExecuteError, @@ -215,8 +260,11 @@ impl Query { let results = if !is_streaming { let task_ctx = QUERY_SESSION.task_ctx(); - let stream = plan.execute(0, task_ctx)?; - let batches = datafusion::physical_plan::common::collect(stream).await?; + let batches = collect_partitioned(plan.clone(), task_ctx.clone()) + .await? + .into_iter() + .flatten() + .collect(); let actual_io_bytes = get_total_bytes_scanned(&plan); @@ -228,11 +276,25 @@ impl Query { } else { let task_ctx = QUERY_SESSION.task_ctx(); - let stream = plan.execute(0, task_ctx)?; + let output_partitions = plan.output_partitioning().partition_count(); + + let monitor_state = Arc::new(MonitorState { + plan: plan.clone(), + active_streams: AtomicUsize::new(output_partitions), + }); + + let streams = execute_stream_partitioned(plan.clone(), task_ctx.clone())? + .into_iter() + .map(|s| { + let wrapped = PartitionedMetricMonitor::new(s, monitor_state.clone()); + Box::pin(wrapped) as SendableRecordBatchStream + }) + .collect_vec(); - let monitored_stream = MetricMonitorStream::new(stream, plan.clone()); + let merged_stream = futures::stream::select_all(streams); - Either::Right(Box::pin(monitored_stream)) + let final_stream = RecordBatchStreamAdapter::new(plan.schema(), merged_stream); + Either::Right(Box::pin(final_stream)) }; Ok((results, fields)) @@ -789,39 +851,52 @@ pub mod error { } } +/// Shared state across all partitions +struct MonitorState { + plan: Arc, + active_streams: AtomicUsize, +} + /// A wrapper that monitors the ExecutionPlan and logs metrics when the stream finishes. -pub struct MetricMonitorStream { +pub struct PartitionedMetricMonitor { // The actual stream doing the work inner: SendableRecordBatchStream, - // We hold the plan so we can read metrics after execution - plan: Arc, + /// State of the streams + state: Arc, + // Ensure we only emit metrics once even if polled after completion/error + is_finished: bool, } -impl MetricMonitorStream { - pub fn new(inner: SendableRecordBatchStream, plan: Arc) -> Self { - Self { inner, plan } +impl PartitionedMetricMonitor { + fn new(inner: SendableRecordBatchStream, state: Arc) -> Self { + Self { + inner, + state, + is_finished: false, + } } } -impl Stream for MetricMonitorStream { +impl Stream for PartitionedMetricMonitor { type Item = datafusion::error::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.is_finished { + return Poll::Ready(None); + } + let poll = self.inner.as_mut().poll_next(cx); // Check if the stream just finished match &poll { Poll::Ready(None) => { - // Stream is done. Now we can safely read the metrics. - let bytes = get_total_bytes_scanned(&self.plan); - let current_date = chrono::Utc::now().date_naive().to_string(); - increment_bytes_scanned_in_query_by_date(bytes, ¤t_date); + self.is_finished = true; + self.check_if_last_stream(); } Poll::Ready(Some(Err(e))) => { - let bytes = get_total_bytes_scanned(&self.plan); - let current_date = chrono::Utc::now().date_naive().to_string(); - increment_bytes_scanned_in_query_by_date(bytes, ¤t_date); tracing::error!("Stream Failed with error: {}", e); + self.is_finished = true; + self.check_if_last_stream(); } _ => {} } @@ -834,12 +909,24 @@ impl Stream for MetricMonitorStream { } } -impl RecordBatchStream for MetricMonitorStream { +impl RecordBatchStream for PartitionedMetricMonitor { fn schema(&self) -> SchemaRef { self.inner.schema() } } +impl PartitionedMetricMonitor { + fn check_if_last_stream(&self) { + let prev_count = self.state.active_streams.fetch_sub(1, Ordering::SeqCst); + + if prev_count == 1 { + let bytes = get_total_bytes_scanned(&self.state.plan); + let current_date = chrono::Utc::now().date_naive().to_string(); + increment_bytes_scanned_in_query_by_date(bytes, ¤t_date); + } + } +} + #[cfg(test)] mod tests { use serde_json::json;