From 4e704e46a29149764ace349eca5c0d9537d84acd Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Tue, 23 Dec 2025 18:54:17 +0530 Subject: [PATCH 1/4] Implement partition_statistics API for NestedLoopJoinExec --- .../partition_statistics.rs | 60 ++++++++++++++++++- .../src/joins/nested_loop_join.rs | 21 ++++--- 2 files changed, 72 insertions(+), 9 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 468d25e0e57d0..2bdc9c77cae0d 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -45,7 +45,7 @@ mod test { use datafusion_physical_plan::common::compute_record_batch_statistics; use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::filter::FilterExec; - use datafusion_physical_plan::joins::CrossJoinExec; + use datafusion_physical_plan::joins::{CrossJoinExec, NestedLoopJoinExec}; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; @@ -636,6 +636,64 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_statistic_by_partition_of_nested_loop_join() -> Result<()> { + use datafusion_expr::JoinType; + + let left_scan = create_scan_exec_with_statistics(None, Some(2)).await; + let left_scan_coalesced: Arc = + Arc::new(CoalescePartitionsExec::new(left_scan)); + + let right_scan = create_scan_exec_with_statistics(None, Some(2)).await; + + let nested_loop_join: Arc = + Arc::new(NestedLoopJoinExec::try_new( + left_scan_coalesced, + right_scan, + None, + &JoinType::RightSemi, + None, + )?); + + // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02] + let mut expected_statistic_partition_1 = create_partition_statistics( + 2, + 16, + 3, + 4, + Some((DATE_2025_03_01, DATE_2025_03_02)), + ); + expected_statistic_partition_1.num_rows = Precision::Inexact(2); + expected_statistic_partition_1.total_byte_size = Precision::Absent; + + // Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04] + let mut expected_statistic_partition_2 = create_partition_statistics( + 2, + 16, + 1, + 2, + Some((DATE_2025_03_03, DATE_2025_03_04)), + ); + expected_statistic_partition_2.num_rows = Precision::Inexact(2); + expected_statistic_partition_2.total_byte_size = Precision::Absent; + + let statistics = (0..nested_loop_join.output_partitioning().partition_count()) + .map(|idx| nested_loop_join.partition_statistics(Some(idx))) + .collect::>>()?; + assert_eq!(statistics.len(), 2); + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![ + ExpectedStatistics::NonEmpty(3, 4, 2), + ExpectedStatistics::NonEmpty(1, 2, 2), + ]; + validate_statistics_with_data(nested_loop_join, expected_stats, 0).await?; + + Ok(()) + } + #[tokio::test] async fn test_statistic_by_partition_of_coalesce_batches() -> Result<()> { let scan = create_scan_exec_with_statistics(None, Some(2)).await; diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 76dca7239114b..9df7fdfe549c4 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -550,17 +550,22 @@ impl ExecutionPlan for NestedLoopJoinExec { } fn partition_statistics(&self, partition: Option) -> Result { - if partition.is_some() { - return Ok(Statistics::new_unknown(&self.schema())); - } let join_columns = Vec::new(); - estimate_join_statistics( - self.left.partition_statistics(None)?, - self.right.partition_statistics(None)?, + let left_stats = self.left.partition_statistics(None)?; + let right_stats = match partition { + Some(partition) => self.right.partition_statistics(Some(partition))?, + None => self.right.partition_statistics(None)?, + }; + + let stats = estimate_join_statistics( + left_stats, + right_stats, &join_columns, &self.join_type, - &self.schema(), - ) + &self.join_schema, + )?; + + Ok(stats.project(self.projection.as_ref())) } /// Tries to push `projection` down through `nested_loop_join`. If possible, performs the From 05eb521466854e9479f535cfc8ede6bd18fdc8d5 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Wed, 24 Dec 2025 21:21:52 +0530 Subject: [PATCH 2/4] added comments --- .../physical-plan/src/joins/nested_loop_join.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 9df7fdfe549c4..e457a4fad9221 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -550,7 +550,18 @@ impl ExecutionPlan for NestedLoopJoinExec { } fn partition_statistics(&self, partition: Option) -> Result { + // NestedLoopJoinExec is designed for joins without equijoin keys in the + // ON clause (e.g., `t1 JOIN t2 ON (t1.v1 + t2.v1) % 2 = 0`). Any join + // predicates are stored in `self.filter`, but `estimate_join_statistics` + // currently doesn't support selectivity estimation for such arbitrary + // filter expressions. We pass an empty join column list, which results + // in a conservative estimate based on input row counts. let join_columns = Vec::new(); + + // Left side is always a single partition (Distribution::SinglePartition), + // so we always request overall stats with `None`. Right side can have + // multiple partitions, so we forward the partition parameter to get + // partition-specific statistics when requested. let left_stats = self.left.partition_statistics(None)?; let right_stats = match partition { Some(partition) => self.right.partition_statistics(Some(partition))?, From 68bcbd6f2ad8e8b59e1508022067631ed3414c4b Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Thu, 25 Dec 2025 22:28:49 +0530 Subject: [PATCH 3/4] added comments ot the estimage statistics function for explainations --- .../src/joins/nested_loop_join.rs | 5 ++-- datafusion/physical-plan/src/joins/utils.rs | 30 ++++++++++++++++++- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index e457a4fad9221..44637321a7e35 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -554,8 +554,9 @@ impl ExecutionPlan for NestedLoopJoinExec { // ON clause (e.g., `t1 JOIN t2 ON (t1.v1 + t2.v1) % 2 = 0`). Any join // predicates are stored in `self.filter`, but `estimate_join_statistics` // currently doesn't support selectivity estimation for such arbitrary - // filter expressions. We pass an empty join column list, which results - // in a conservative estimate based on input row counts. + // filter expressions. We pass an empty join column list, which means + // the cardinality estimation cannot use column statistics and returns + // unknown row counts. let join_columns = Vec::new(); // Left side is always a single partition (Distribution::SinglePartition), diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index e0cdda6d5a729..a9243fe04e28d 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -411,7 +411,35 @@ struct PartialJoinStatistics { pub column_statistics: Vec, } -/// Estimate the statistics for the given join's output. +/// Estimates the output statistics for a join operation based on input statistics. +/// +/// # Statistics Propagation +/// +/// This function estimates join output statistics using the following approach: +/// - **Row count estimation**: Uses the `on` parameter (equijoin keys) to estimate +/// output cardinality via [`estimate_join_cardinality`]. The estimation is based on +/// column-level statistics (distinct counts, min/max values) of the join keys. +/// - **Column statistics**: Combines column statistics from both inputs. For join types +/// that preserve all columns (Inner, Left, Right, Full), statistics from both sides +/// are concatenated. For semi/anti joins, only the relevant side's statistics are kept. +/// - **Byte size**: Always returns `Precision::Absent` as join output size is difficult +/// to estimate without knowing the actual data. +/// +/// # The `on` Parameter +/// +/// The `on` parameter represents equijoin keys (e.g., `t1.id = t2.id`). When `on` is +/// empty (as in NestedLoopJoinExec which handles non-equijoin predicates), the +/// cardinality estimation cannot compute selectivity from join keys, and this function +/// returns unknown statistics (`num_rows: Precision::Absent`). +/// +/// # Limitations +/// +/// - Does not account for selectivity of arbitrary join filter expressions +/// (e.g., `(t1.v1 + t2.v1) % 2 = 0`). Such filters, common in NestedLoopJoinExec, +/// are not factored into the cardinality estimation. +/// - Column statistics for the output are simply combined from inputs without +/// adjusting for join selectivity (acknowledged in the code as needing +/// "filter selectivity analysis"). pub(crate) fn estimate_join_statistics( left_stats: Statistics, right_stats: Statistics, From dbd1b7235902de6ab2dfc42f503c030e98673a1c Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Tue, 30 Dec 2025 18:48:20 +0530 Subject: [PATCH 4/4] updated test --- .../physical_optimizer/partition_statistics.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 2bdc9c77cae0d..615ac336145ea 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -655,6 +655,23 @@ mod test { None, )?); + // Test partition_statistics(None) - returns overall statistics + // For RightSemi join, output columns come from right side only + let full_statistics = nested_loop_join.partition_statistics(None)?; + // With empty join columns, estimate_join_statistics returns Inexact row count + // based on the outer side (right side for RightSemi) + let mut expected_full_statistics = create_partition_statistics( + 4, + 32, + 1, + 4, + Some((DATE_2025_03_01, DATE_2025_03_04)), + ); + expected_full_statistics.num_rows = Precision::Inexact(4); + expected_full_statistics.total_byte_size = Precision::Absent; + assert_eq!(full_statistics, expected_full_statistics); + + // Test partition_statistics(Some(idx)) - returns partition-specific statistics // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02] let mut expected_statistic_partition_1 = create_partition_statistics( 2,