From 6a81bb77d79d1dcf6f2910133177a8342c615248 Mon Sep 17 00:00:00 2001 From: Thomas Tanon Date: Sat, 29 Nov 2025 18:39:16 +0100 Subject: [PATCH 1/6] CteWorkTable: properly apply TableProvider::scan projection argument It was previously ignored --- datafusion/catalog/src/cte_worktable.rs | 16 +++++--- datafusion/physical-plan/src/work_table.rs | 46 ++++++++++++++++------ 2 files changed, 45 insertions(+), 17 deletions(-) diff --git a/datafusion/catalog/src/cte_worktable.rs b/datafusion/catalog/src/cte_worktable.rs index d6b2a453118c..5475a7cf55ba 100644 --- a/datafusion/catalog/src/cte_worktable.rs +++ b/datafusion/catalog/src/cte_worktable.rs @@ -23,6 +23,7 @@ use std::{any::Any, borrow::Cow}; use crate::Session; use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use datafusion_common::{assert_or_internal_err, DataFusionError}; use datafusion_physical_plan::work_table::WorkTableExec; use datafusion_physical_plan::ExecutionPlan; @@ -86,15 +87,20 @@ impl TableProvider for CteWorkTable { async fn scan( &self, _state: &dyn Session, - _projection: Option<&Vec>, - _filters: &[Expr], - _limit: Option, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, ) -> Result> { - // TODO: pushdown filters and limits + assert_or_internal_err!( + filters.is_empty(), + "CteWorkTable does not support pushing filters" + ); + assert_or_internal_err!(limit.is_none(), "CteWorkTable pushing limit"); Ok(Arc::new(WorkTableExec::new( self.name.clone(), Arc::clone(&self.table_schema), - ))) + projection.cloned(), + )?)) } fn supports_filters_pushdown( diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index ba7c98c26480..ccb6ccf99013 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -100,25 +100,35 @@ pub struct WorkTableExec { name: String, /// The schema of the stream schema: SchemaRef, + /// Projection to apply to build the output stream from the recursion state + projection: Option>, /// The work table work_table: Arc, /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + plan_properties: PlanProperties, } impl WorkTableExec { /// Create a new execution plan for a worktable exec. - pub fn new(name: String, schema: SchemaRef) -> Self { - let cache = Self::compute_properties(Arc::clone(&schema)); - Self { + pub fn new( + name: String, + mut schema: SchemaRef, + projection: Option>, + ) -> Result { + if let Some(projection) = &projection { + schema = Arc::new(schema.project(projection)?); + } + let plan_properties = Self::compute_properties(Arc::clone(&schema)); + Ok(Self { name, schema, + projection, metrics: ExecutionPlanMetricsSet::new(), work_table: Arc::new(WorkTable::new()), - cache, - } + plan_properties, + }) } /// Ref to name @@ -170,7 +180,7 @@ impl ExecutionPlan for WorkTableExec { } fn properties(&self) -> &PlanProperties { - &self.cache + &self.plan_properties } fn children(&self) -> Vec<&Arc> { @@ -196,11 +206,22 @@ impl ExecutionPlan for WorkTableExec { 0, "WorkTableExec got an invalid partition {partition} (expected 0)" ); - let batch = self.work_table.take()?; + let ReservedBatches { + mut batches, + reservation, + } = self.work_table.take()?; + if let Some(projection) = &self.projection { + // We apply the projection + // TODO: it would be better to apply it as soon as possible and not only here + // TODO: an aggressive projection makes the memory reservation smaller, even if we do not edit it + batches = batches + .into_iter() + .map(|b| b.project(projection)) + .collect::, _>>()?; + } - let stream = - MemoryStream::try_new(batch.batches, Arc::clone(&self.schema), None)? - .with_reservation(batch.reservation); + let stream = MemoryStream::try_new(batches, Arc::clone(&self.schema), None)? + .with_reservation(reservation); Ok(Box::pin(cooperative(stream))) } @@ -233,9 +254,10 @@ impl ExecutionPlan for WorkTableExec { Some(Arc::new(Self { name: self.name.clone(), schema: Arc::clone(&self.schema), + projection: self.projection.clone(), metrics: ExecutionPlanMetricsSet::new(), work_table, - cache: self.cache.clone(), + plan_properties: self.plan_properties.clone(), })) } } From 91bb0720cda6c28d0248cf2194519f9e99add738 Mon Sep 17 00:00:00 2001 From: Thomas Tanon Date: Sun, 30 Nov 2025 16:14:37 +0100 Subject: [PATCH 2/6] Rename plan_properties into cache --- datafusion/physical-plan/src/work_table.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index ccb6ccf99013..6e845245e9b4 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -107,7 +107,7 @@ pub struct WorkTableExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - plan_properties: PlanProperties, + cache: PlanProperties, } impl WorkTableExec { @@ -127,7 +127,7 @@ impl WorkTableExec { projection, metrics: ExecutionPlanMetricsSet::new(), work_table: Arc::new(WorkTable::new()), - plan_properties, + cache: plan_properties, }) } @@ -180,7 +180,7 @@ impl ExecutionPlan for WorkTableExec { } fn properties(&self) -> &PlanProperties { - &self.plan_properties + &self.cache } fn children(&self) -> Vec<&Arc> { @@ -257,7 +257,7 @@ impl ExecutionPlan for WorkTableExec { projection: self.projection.clone(), metrics: ExecutionPlanMetricsSet::new(), work_table, - plan_properties: self.plan_properties.clone(), + cache: self.cache.clone(), })) } } From 853c24ee5592130b0fd3383e060bfd73f82d3dab Mon Sep 17 00:00:00 2001 From: Thomas Tanon Date: Sun, 30 Nov 2025 16:27:20 +0100 Subject: [PATCH 3/6] Adds a projection test --- datafusion/physical-plan/src/work_table.rs | 53 +++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 6e845245e9b4..8fda6904213c 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -265,8 +265,10 @@ impl ExecutionPlan for WorkTableExec { #[cfg(test)] mod tests { use super::*; - use arrow::array::{ArrayRef, Int32Array}; + use arrow::array::{ArrayRef, Int16Array, Int32Array, Int64Array}; + use arrow_schema::{DataType, Field, Schema}; use datafusion_execution::memory_pool::{MemoryConsumer, UnboundedMemoryPool}; + use futures::StreamExt; #[test] fn test_work_table() { @@ -299,4 +301,53 @@ mod tests { drop(memory_stream); assert_eq!(pool.reserved(), 0); } + + #[tokio::test] + async fn test_work_table_exec() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int16, false), + ])); + let work_table_exec = + WorkTableExec::new("wt".into(), Arc::clone(&schema), Some(vec![2, 1])) + .unwrap(); + + // We inject the work table + let work_table = Arc::new(WorkTable::new()); + let work_table_exec = work_table_exec + .with_new_state(Arc::clone(&work_table) as _) + .unwrap(); + + // We update the work table + let pool = Arc::new(UnboundedMemoryPool::default()) as _; + let reservation = MemoryConsumer::new("test_work_table").register(&pool); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(Int16Array::from(vec![1, 2, 3, 4, 5])), + ], + ) + .unwrap(); + work_table.update(ReservedBatches::new(vec![batch], reservation)); + + // We get back the batch from the work table + let returned_batch = work_table_exec + .execute(0, Arc::new(TaskContext::default())) + .unwrap() + .next() + .await + .unwrap() + .unwrap(); + assert_eq!( + returned_batch, + RecordBatch::try_from_iter(vec![ + ("c", Arc::new(Int16Array::from(vec![1, 2, 3, 4, 5])) as _), + ("b", Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])) as _), + ]) + .unwrap() + ); + } } From bf9e4ee93c3c225caea61e8a6ca3fe7f84da4310 Mon Sep 17 00:00:00 2001 From: Thomas Tanon Date: Sun, 30 Nov 2025 16:28:54 +0100 Subject: [PATCH 4/6] Improve error message --- datafusion/catalog/src/cte_worktable.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/catalog/src/cte_worktable.rs b/datafusion/catalog/src/cte_worktable.rs index 5475a7cf55ba..22e910760c6d 100644 --- a/datafusion/catalog/src/cte_worktable.rs +++ b/datafusion/catalog/src/cte_worktable.rs @@ -95,7 +95,10 @@ impl TableProvider for CteWorkTable { filters.is_empty(), "CteWorkTable does not support pushing filters" ); - assert_or_internal_err!(limit.is_none(), "CteWorkTable pushing limit"); + assert_or_internal_err!( + limit.is_none(), + "CteWorkTable does not support limit pushdown" + ); Ok(Arc::new(WorkTableExec::new( self.name.clone(), Arc::clone(&self.table_schema), From 25f446596a4995d7fb6c7c58d65e2b4949fda0e0 Mon Sep 17 00:00:00 2001 From: Thomas Tanon Date: Fri, 12 Dec 2025 08:59:00 +0100 Subject: [PATCH 5/6] Implement TableProvider::scan_with_args --- datafusion/catalog/src/cte_worktable.rs | 41 +++++++++++++------------ 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/datafusion/catalog/src/cte_worktable.rs b/datafusion/catalog/src/cte_worktable.rs index 22e910760c6d..adab76dd138d 100644 --- a/datafusion/catalog/src/cte_worktable.rs +++ b/datafusion/catalog/src/cte_worktable.rs @@ -17,21 +17,18 @@ //! CteWorkTable implementation used for recursive queries +use std::any::Any; +use std::borrow::Cow; use std::sync::Arc; -use std::{any::Any, borrow::Cow}; -use crate::Session; use arrow::datatypes::SchemaRef; use async_trait::async_trait; -use datafusion_common::{assert_or_internal_err, DataFusionError}; -use datafusion_physical_plan::work_table::WorkTableExec; - -use datafusion_physical_plan::ExecutionPlan; - use datafusion_common::error::Result; use datafusion_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableType}; +use datafusion_physical_plan::work_table::WorkTableExec; +use datafusion_physical_plan::ExecutionPlan; -use crate::TableProvider; +use crate::{ScanArgs, ScanResult, Session, TableProvider}; /// The temporary working table where the previous iteration of a recursive query is stored /// Naming is based on PostgreSQL's implementation. @@ -86,24 +83,28 @@ impl TableProvider for CteWorkTable { async fn scan( &self, - _state: &dyn Session, + state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], limit: Option, ) -> Result> { - assert_or_internal_err!( - filters.is_empty(), - "CteWorkTable does not support pushing filters" - ); - assert_or_internal_err!( - limit.is_none(), - "CteWorkTable does not support limit pushdown" - ); - Ok(Arc::new(WorkTableExec::new( + let options = ScanArgs::default() + .with_projection(projection.map(|p| p.as_slice())) + .with_filters(Some(filters)) + .with_limit(limit); + Ok(self.scan_with_args(state, options).await?.into_inner()) + } + + async fn scan_with_args<'a>( + &self, + _state: &dyn Session, + args: ScanArgs<'a>, + ) -> Result { + Ok(ScanResult::new(Arc::new(WorkTableExec::new( self.name.clone(), Arc::clone(&self.table_schema), - projection.cloned(), - )?)) + args.projection().map(|p| p.to_vec()), + )?))) } fn supports_filters_pushdown( From 171e222cfb76b38e0082306a1bf9481d90fbb51e Mon Sep 17 00:00:00 2001 From: Thomas Tanon Date: Fri, 26 Dec 2025 09:57:53 +0100 Subject: [PATCH 6/6] Fixes CI --- datafusion/catalog/src/cte_worktable.rs | 2 +- datafusion/physical-plan/src/work_table.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/catalog/src/cte_worktable.rs b/datafusion/catalog/src/cte_worktable.rs index adab76dd138d..9565dcc60141 100644 --- a/datafusion/catalog/src/cte_worktable.rs +++ b/datafusion/catalog/src/cte_worktable.rs @@ -25,8 +25,8 @@ use arrow::datatypes::SchemaRef; use async_trait::async_trait; use datafusion_common::error::Result; use datafusion_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableType}; -use datafusion_physical_plan::work_table::WorkTableExec; use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::work_table::WorkTableExec; use crate::{ScanArgs, ScanResult, Session, TableProvider}; diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index e3a48af25b43..f1b9e3e88d12 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -320,7 +320,7 @@ mod tests { .unwrap(); // We inject the work table - let work_table = Arc::new(WorkTable::new()); + let work_table = Arc::new(WorkTable::new("wt".into())); let work_table_exec = work_table_exec .with_new_state(Arc::clone(&work_table) as _) .unwrap();