From a14ff05f41037f712265f565b2ebac0111e6dc35 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 22 Dec 2025 17:59:52 +0800 Subject: [PATCH 01/18] Add regression test for multi-partition empty MemTable Include a regression test that verifies the optimized plan maintains a safe distribution path through either repartitioning or sort-preserving merge. Ensure it executes correctly without panicking. Register the new test module in the physical optimizer integration test suite. --- .../aggregate_repartition.rs | 82 +++++++++++++++++++ .../core/tests/physical_optimizer/mod.rs | 1 + 2 files changed, 83 insertions(+) create mode 100644 datafusion/core/tests/physical_optimizer/aggregate_repartition.rs diff --git a/datafusion/core/tests/physical_optimizer/aggregate_repartition.rs b/datafusion/core/tests/physical_optimizer/aggregate_repartition.rs new file mode 100644 index 0000000000000..ee7bd835dd253 --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/aggregate_repartition.rs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Regression test for ensuring repartitioning between chained aggregates. + +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use datafusion::datasource::MemTable; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_common::Result; +use datafusion_expr::col; +use datafusion_functions_aggregate::expr_fn::count; +use datafusion_physical_plan::{collect, displayable}; + +#[tokio::test] +async fn repartition_between_chained_aggregates() -> Result<()> { + // Build a two-partition, empty MemTable with the expected schema to mimic the + // reported failing plan: Sort -> Aggregate(ts, region) -> Sort -> Aggregate(ts). + let schema = Arc::new(Schema::new(vec![ + Field::new("ts", DataType::Int64, true), + Field::new("region", DataType::Utf8, true), + Field::new("value", DataType::Int64, true), + ])); + let empty_batch = RecordBatch::new_empty(schema.clone()); + let partitions = vec![vec![empty_batch.clone()], vec![empty_batch]]; + let mem_table = MemTable::try_new(schema, partitions)?; + + let config = SessionConfig::new().with_target_partitions(2); + let ctx = SessionContext::new_with_config(config); + ctx.register_table("metrics", Arc::new(mem_table))?; + + let df = ctx + .table("metrics") + .await? + .sort(vec![ + col("ts").sort(true, true), + col("region").sort(true, true), + ])? + .aggregate(vec![col("ts"), col("region")], vec![count(col("value"))])? + .sort(vec![ + col("ts").sort(true, true), + col("region").sort(true, true), + ])? + .aggregate(vec![col("ts")], vec![count(col("region"))])?; + + let physical_plan = df.create_physical_plan().await?; + + // The optimizer should either keep the stream single-partitioned via the + // sort-preserving merge, or insert a repartition between the two aggregates + // so that the second grouping sees a consistent hash distribution. Either + // path protects against the panic that was previously reported for this + // plan shape. + let plan_display = displayable(physical_plan.as_ref()).indent(true).to_string(); + let has_repartition = + plan_display.contains("RepartitionExec: partitioning=Hash([ts@0], 2)"); + assert!( + has_repartition || plan_display.contains("SortPreservingMergeExec"), + "Expected either a repartition between aggregates or a sort-preserving merge chain" + ); + + // Execute the optimized plan to ensure the empty, multi-partition pipeline does not panic. + let batches = collect(physical_plan, ctx.task_ctx()).await?; + assert!(batches.is_empty()); + + Ok(()) +} diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index d11322cd26be9..e5fe0742d910e 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -17,6 +17,7 @@ //! Physical Optimizer integration tests +mod aggregate_repartition; #[expect(clippy::needless_pass_by_value)] mod aggregate_statistics; mod combine_partial_final_agg; From 9b5438b1689222f5498d0181e59c6b2af5dcfc25 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 22 Dec 2025 18:00:34 +0800 Subject: [PATCH 02/18] Update repartition detection and add unit test Flag unmet hash distributions when existing partitioning keys differ from the requirements, regardless of partition counts. Add a superset-hash unit test to ensure EnforceDistribution inserts the necessary repartition while preserving the original partitioning layer. --- .../enforce_distribution_superset.rs | 122 ++++++++++++++++++ .../src/enforce_distribution.rs | 12 +- 2 files changed, 131 insertions(+), 3 deletions(-) create mode 100644 datafusion/core/tests/physical_optimizer/enforce_distribution_superset.rs diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution_superset.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution_superset.rs new file mode 100644 index 0000000000000..2178a5e0fb742 --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution_superset.rs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use crate::physical_optimizer::test_utils::memory_exec; +use datafusion_common::config::ConfigOptions; +use datafusion_common::{JoinType, NullEquality, Result}; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::{Partitioning, physical_exprs_equal}; +use datafusion_physical_optimizer::enforce_distribution::EnforceDistribution; +use datafusion_physical_plan::joins::utils::JoinOn; +use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; +use datafusion_physical_plan::repartition::RepartitionExec; + +fn repartition_superset_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("region", DataType::Utf8, true), + Field::new("ts", DataType::Int64, true), + ])) +} + +#[test] +fn enforce_distribution_repartitions_superset_hash_keys() -> Result<()> { + let schema = repartition_superset_schema(); + let left = memory_exec(&schema); + let right = memory_exec(&schema); + + let left_superset = Arc::new(RepartitionExec::try_new( + Arc::clone(&left), + Partitioning::Hash( + vec![ + Arc::new(Column::new_with_schema("region", &schema).unwrap()), + Arc::new(Column::new_with_schema("ts", &schema).unwrap()), + ], + 4, + ), + )?); + + let right_partitioned = Arc::new(RepartitionExec::try_new( + right, + Partitioning::Hash( + vec![Arc::new(Column::new_with_schema("ts", &schema).unwrap())], + 4, + ), + )?); + + let on: JoinOn = vec![ + ( + Arc::new(Column::new_with_schema("ts", &schema).unwrap()), + Arc::new(Column::new_with_schema("ts", &schema).unwrap()), + ), + ]; + + let join = Arc::new(HashJoinExec::try_new( + left_superset, + right_partitioned, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + NullEquality::NullEqualsNothing, + )?); + + let mut config = ConfigOptions::new(); + config.execution.target_partitions = 4; + + let optimized = EnforceDistribution::new().optimize(join, &config)?; + let join = optimized + .as_any() + .downcast_ref::() + .expect("expected hash join"); + + let repartition = join + .left() + .as_any() + .downcast_ref::() + .expect("left side should be repartitioned"); + + match repartition.partitioning() { + Partitioning::Hash(exprs, partitions) => { + assert_eq!(*partitions, 4); + let expected = + vec![Arc::new(Column::new_with_schema("ts", &schema).unwrap()) as _]; + assert!( + physical_exprs_equal(exprs, &expected), + "expected repartitioning on [ts]" + ); + } + other => panic!("expected hash repartitioning, got {other:?}"), + } + + // The original (superset) partitioning should remain below the new repartition + // so we can rehash to the required keys even when the partition counts already match. + let original = repartition + .input() + .as_any() + .downcast_ref::() + .expect("expected original repartition to remain"); + assert!(matches!( + original.partitioning(), + Partitioning::Hash(_, 4) + )); + + Ok(()) +} diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 6120e1f3b5826..054ed92be9b75 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -1118,6 +1118,7 @@ fn get_repartition_requirement_status( for (child, requirement, roundrobin_beneficial) in izip!(children.into_iter(), requirements, rr_beneficial) { + let output_partitioning = child.output_partitioning(); // Decide whether adding a round robin is beneficial depending on // the statistical information we have on the number of rows: let roundrobin_beneficial_stats = match child.partition_statistics(None)?.num_rows @@ -1127,18 +1128,23 @@ fn get_repartition_requirement_status( Precision::Absent => true, }; let is_hash = matches!(requirement, Distribution::HashPartitioned(_)); + let hash_requirement_unmet = is_hash + && !output_partitioning + .satisfaction(&requirement, child.equivalence_properties(), false) + .is_satisfied(); // Hash re-partitioning is necessary when the input has more than one // partitions: - let multi_partitions = child.output_partitioning().partition_count() > 1; + let multi_partitions = output_partitioning.partition_count() > 1; let roundrobin_sensible = roundrobin_beneficial && roundrobin_beneficial_stats; - needs_alignment |= is_hash && (multi_partitions || roundrobin_sensible); + needs_alignment |= is_hash + && (multi_partitions || roundrobin_sensible || hash_requirement_unmet); repartition_status_flags.push(( is_hash, RepartitionRequirementStatus { requirement, roundrobin_beneficial, roundrobin_beneficial_stats, - hash_necessary: is_hash && multi_partitions, + hash_necessary: is_hash && (multi_partitions || hash_requirement_unmet), }, )); } From 68b565cb934590f8b8ff7ad1d96f954352c1955d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 22 Dec 2025 18:10:04 +0800 Subject: [PATCH 03/18] Refactor distribution satisfaction checks Add a shared DistributionSatisfactionResult helper to streamline partitioning satisfaction checks. Update the SanityCheckPlan to utilize this helper and re-export PartitioningSatisfaction for better cross-crate reuse. Include unit tests for exact, subset, and superset hash partitioning scenarios to ensure consistent enforcement and sanity checking interpretations. --- .../enforce_distribution.rs | 79 ++++++- datafusion/physical-expr/src/lib.rs | 2 +- .../src/enforce_distribution.rs | 192 +++++++++--------- .../physical-optimizer/src/sanity_checker.rs | 10 +- 4 files changed, 173 insertions(+), 110 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 4815f4a8a8a36..e725a4cb716c6 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -44,6 +44,8 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_expr::{JoinType, Operator}; +use datafusion_physical_expr::Distribution; +use datafusion_physical_expr::PartitioningSatisfaction; use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, binary, lit}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{ @@ -52,7 +54,9 @@ use datafusion_physical_expr_common::sort_expr::{ use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_optimizer::enforce_distribution::*; use datafusion_physical_optimizer::enforce_sorting::EnforceSorting; +use datafusion_physical_optimizer::output_requirements::OutputRequirementExec; use datafusion_physical_optimizer::output_requirements::OutputRequirements; +use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; @@ -64,11 +68,12 @@ use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::joins::utils::JoinOn; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; +use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlanProperties, PlanProperties, Statistics, - displayable, + DisplayAs, DisplayFormatType, ExecutionPlanProperties, Partitioning, PlanProperties, + Statistics, displayable, }; use insta::Settings; @@ -3609,6 +3614,76 @@ async fn test_distribute_sort_memtable() -> Result<()> { Ok(()) } +#[test] +fn distribution_satisfaction_exact_hash_matches_sanity_check() -> Result<()> { + let schema = schema(); + let col_a: Arc = Arc::new(Column::new_with_schema("a", &schema)?); + let col_b: Arc = Arc::new(Column::new_with_schema("b", &schema)?); + + assert_hash_satisfaction_alignment( + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + vec![col_a, col_b], + PartitioningSatisfaction::Exact, + false, + ) +} + +#[test] +fn distribution_satisfaction_subset_hash_matches_sanity_check() -> Result<()> { + let schema = schema(); + let col_a: Arc = Arc::new(Column::new_with_schema("a", &schema)?); + let col_b: Arc = Arc::new(Column::new_with_schema("b", &schema)?); + + assert_hash_satisfaction_alignment( + vec![Arc::clone(&col_a)], + vec![col_a, col_b], + PartitioningSatisfaction::Subset, + false, + ) +} + +#[test] +fn distribution_satisfaction_superset_hash_matches_sanity_check() -> Result<()> { + let schema = schema(); + let col_a: Arc = Arc::new(Column::new_with_schema("a", &schema)?); + let col_b: Arc = Arc::new(Column::new_with_schema("b", &schema)?); + + assert_hash_satisfaction_alignment( + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + vec![col_a], + PartitioningSatisfaction::NotSatisfied, + true, + ) +} + +fn assert_hash_satisfaction_alignment( + partitioning_exprs: Vec>, + required_exprs: Vec>, + expected: PartitioningSatisfaction, + expect_err: bool, +) -> Result<()> { + let child: Arc = Arc::new(RepartitionExec::try_new( + parquet_exec(), + Partitioning::Hash(partitioning_exprs, 4), + )?); + + let requirement = Distribution::HashPartitioned(required_exprs); + let satisfaction = distribution_satisfaction(&child, &requirement, true); + assert_eq!(satisfaction.satisfaction(), expected); + + let parent: Arc = + Arc::new(OutputRequirementExec::new(child, None, requirement, None)); + let sanity = SanityCheckPlan::new().optimize(parent, &ConfigOptions::default()); + + if expect_err { + assert!(sanity.is_err()); + } else { + sanity?; + } + + Ok(()) +} + /// Create a [`MemTable`] with 100 batches of 8192 rows each, in 1 partition fn create_memtable() -> Result { let mut batches = Vec::with_capacity(100); diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 988e14c28e17c..8aab3fd2ff0a6 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -58,7 +58,7 @@ pub use analysis::{AnalysisContext, ExprBoundaries, analyze}; pub use equivalence::{ AcrossPartitions, ConstExpr, EquivalenceProperties, calculate_union, }; -pub use partitioning::{Distribution, Partitioning}; +pub use partitioning::{Distribution, Partitioning, PartitioningSatisfaction}; pub use physical_expr::{ add_offset_to_expr, add_offset_to_physical_sort_exprs, create_lex_ordering, create_ordering, create_physical_sort_expr, create_physical_sort_exprs, diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 054ed92be9b75..ed04ed48d594b 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -40,7 +40,8 @@ use datafusion_expr::logical_plan::JoinType; use datafusion_physical_expr::expressions::{Column, NoOp}; use datafusion_physical_expr::utils::map_columns_before_projection; use datafusion_physical_expr::{ - EquivalenceProperties, PhysicalExpr, PhysicalExprRef, physical_exprs_equal, + EquivalenceProperties, PartitioningSatisfaction, PhysicalExpr, PhysicalExprRef, + physical_exprs_equal, }; use datafusion_physical_plan::ExecutionPlanProperties; use datafusion_physical_plan::aggregates::{ @@ -831,6 +832,60 @@ fn new_join_conditions( .collect() } +#[derive(Debug, Clone)] +pub struct DistributionSatisfactionResult { + requirement: Distribution, + output_partitioning: Partitioning, + satisfaction: PartitioningSatisfaction, +} + +impl DistributionSatisfactionResult { + pub fn is_satisfied(&self) -> bool { + self.satisfaction.is_satisfied() + } + + pub fn satisfaction(&self) -> PartitioningSatisfaction { + self.satisfaction + } + + pub fn output_partitioning(&self) -> &Partitioning { + &self.output_partitioning + } + + pub fn required_distribution(&self) -> &Distribution { + &self.requirement + } + + pub fn requires_repartition( + &self, + allow_subset: bool, + target_partitions: usize, + ) -> bool { + !self.is_satisfied() + || (!allow_subset + && target_partitions > self.output_partitioning.partition_count()) + } +} + +pub fn distribution_satisfaction( + plan: &Arc, + requirement: &Distribution, + allow_subset: bool, +) -> DistributionSatisfactionResult { + let output_partitioning = plan.output_partitioning().clone(); + let satisfaction = output_partitioning.satisfaction( + requirement, + plan.equivalence_properties(), + allow_subset, + ); + + DistributionSatisfactionResult { + requirement: requirement.clone(), + output_partitioning, + satisfaction, + } +} + /// Adds RoundRobin repartition operator to the plan increase parallelism. /// /// # Arguments @@ -890,7 +945,7 @@ fn add_roundrobin_on_top( /// distribution is satisfied by adding a Hash repartition. fn add_hash_on_top( input: DistributionContext, - hash_exprs: Vec>, + satisfaction: DistributionSatisfactionResult, n_target: usize, allow_subset_satisfy_partitioning: bool, ) -> Result { @@ -900,22 +955,11 @@ fn add_hash_on_top( return Ok(input); } - let dist = Distribution::HashPartitioned(hash_exprs); - let satisfaction = input.plan.output_partitioning().satisfaction( - &dist, - input.plan.equivalence_properties(), - allow_subset_satisfy_partitioning, - ); - // Add hash repartitioning when: // - When subset satisfaction is enabled (current >= threshold): only repartition if not satisfied // - When below threshold (current < threshold): repartition if expressions don't match OR to increase parallelism - let needs_repartition = if allow_subset_satisfy_partitioning { - !satisfaction.is_satisfied() - } else { - !satisfaction.is_satisfied() - || n_target > input.plan.output_partitioning().partition_count() - }; + let needs_repartition = + satisfaction.requires_repartition(allow_subset_satisfy_partitioning, n_target); if needs_repartition { // When there is an existing ordering, we preserve ordering during @@ -925,7 +969,10 @@ fn add_hash_on_top( // requirements. // - Usage of order preserving variants is not desirable (per the flag // `config.optimizer.prefer_existing_sort`). - let partitioning = dist.create_partitioning(n_target); + let partitioning = satisfaction + .required_distribution() + .clone() + .create_partitioning(n_target); let repartition = RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)? .with_preserve_order(); @@ -1070,100 +1117,37 @@ struct RepartitionRequirementStatus { /// Designates whether round robin partitioning is beneficial according to /// the statistical information we have on the number of rows. roundrobin_beneficial_stats: bool, - /// Designates whether hash partitioning is necessary. - hash_necessary: bool, } -/// Calculates the `RepartitionRequirementStatus` for each children to generate +/// Calculates the `RepartitionRequirementStatus` for each child to generate /// consistent and sensible (in terms of performance) distribution requirements. -/// As an example, a hash join's left (build) child might produce -/// -/// ```text -/// RepartitionRequirementStatus { -/// .., -/// hash_necessary: true -/// } -/// ``` -/// -/// while its right (probe) child might have very few rows and produce: -/// -/// ```text -/// RepartitionRequirementStatus { -/// .., -/// hash_necessary: false -/// } -/// ``` -/// -/// These statuses are not consistent as all children should agree on hash -/// partitioning. This function aligns the statuses to generate consistent -/// hash partitions for each children. After alignment, the right child's -/// status would turn into: -/// -/// ```text -/// RepartitionRequirementStatus { -/// .., -/// hash_necessary: true -/// } -/// ``` fn get_repartition_requirement_status( plan: &Arc, batch_size: usize, should_use_estimates: bool, ) -> Result> { - let mut needs_alignment = false; let children = plan.children(); let rr_beneficial = plan.benefits_from_input_partitioning(); let requirements = plan.required_input_distribution(); - let mut repartition_status_flags = vec![]; - for (child, requirement, roundrobin_beneficial) in - izip!(children.into_iter(), requirements, rr_beneficial) - { - let output_partitioning = child.output_partitioning(); - // Decide whether adding a round robin is beneficial depending on - // the statistical information we have on the number of rows: - let roundrobin_beneficial_stats = match child.partition_statistics(None)?.num_rows - { - Precision::Exact(n_rows) => n_rows > batch_size, - Precision::Inexact(n_rows) => !should_use_estimates || (n_rows > batch_size), - Precision::Absent => true, - }; - let is_hash = matches!(requirement, Distribution::HashPartitioned(_)); - let hash_requirement_unmet = is_hash - && !output_partitioning - .satisfaction(&requirement, child.equivalence_properties(), false) - .is_satisfied(); - // Hash re-partitioning is necessary when the input has more than one - // partitions: - let multi_partitions = output_partitioning.partition_count() > 1; - let roundrobin_sensible = roundrobin_beneficial && roundrobin_beneficial_stats; - needs_alignment |= is_hash - && (multi_partitions || roundrobin_sensible || hash_requirement_unmet); - repartition_status_flags.push(( - is_hash, - RepartitionRequirementStatus { + izip!(children.into_iter(), requirements, rr_beneficial) + .map(|(child, requirement, roundrobin_beneficial)| { + // Decide whether adding a round robin is beneficial depending on + // the statistical information we have on the number of rows: + let roundrobin_beneficial_stats = + match child.partition_statistics(None)?.num_rows { + Precision::Exact(n_rows) => n_rows > batch_size, + Precision::Inexact(n_rows) => { + !should_use_estimates || (n_rows > batch_size) + } + Precision::Absent => true, + }; + Ok(RepartitionRequirementStatus { requirement, roundrobin_beneficial, roundrobin_beneficial_stats, - hash_necessary: is_hash && (multi_partitions || hash_requirement_unmet), - }, - )); - } - // Align hash necessary flags for hash partitions to generate consistent - // hash partitions at each children: - if needs_alignment { - // When there is at least one hash requirement that is necessary or - // beneficial according to statistics, make all children require hash - // repartitioning: - for (is_hash, status) in &mut repartition_status_flags { - if *is_hash { - status.hash_necessary = true; - } - } - } - Ok(repartition_status_flags - .into_iter() - .map(|(_, status)| status) - .collect()) + }) + }) + .collect() } /// This function checks whether we need to add additional data exchange @@ -1287,7 +1271,6 @@ pub fn ensure_distribution( requirement, roundrobin_beneficial, roundrobin_beneficial_stats, - hash_necessary, }, )| { let increases_partition_count = @@ -1327,13 +1310,20 @@ pub fn ensure_distribution( Distribution::SinglePartition => { child = add_merge_on_top(child); } - Distribution::HashPartitioned(exprs) => { + Distribution::HashPartitioned(_) => { // See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background - // When inserting hash is necessary to satisfy hash requirement, insert hash repartition. - if hash_necessary { + let satisfaction = distribution_satisfaction( + &child.plan, + &requirement, + allow_subset_satisfy_partitioning, + ); + if satisfaction.requires_repartition( + allow_subset_satisfy_partitioning, + target_partitions, + ) { child = add_hash_on_top( child, - exprs.to_vec(), + satisfaction, target_partitions, allow_subset_satisfy_partitioning, )?; diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs index bff33a281556d..29bbade540cb6 100644 --- a/datafusion/physical-optimizer/src/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -35,6 +35,7 @@ use datafusion_physical_plan::joins::SymmetricHashJoinExec; use datafusion_physical_plan::{ExecutionPlanProperties, get_plan_string}; use crate::PhysicalOptimizerRule; +use crate::enforce_distribution::distribution_satisfaction; use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list; use itertools::izip; @@ -151,18 +152,15 @@ pub fn check_plan_sanity( } } - if !child - .output_partitioning() - .satisfaction(&dist_req, child_eq_props, true) - .is_satisfied() - { + let dist_satisfaction = distribution_satisfaction(&child, &dist_req, true); + if !dist_satisfaction.is_satisfied() { let plan_str = get_plan_string(&plan); return plan_err!( "Plan: {:?} does not satisfy distribution requirements: {}. Child-{} output partitioning: {}", plan_str, dist_req, idx, - child.output_partitioning() + dist_satisfaction.output_partitioning() ); } } From 389178259be7249bb607384b671c40d3f8d59372 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 22 Dec 2025 18:57:31 +0800 Subject: [PATCH 04/18] clippy fix --- datafusion/physical-optimizer/src/enforce_distribution.rs | 4 ++-- datafusion/physical-optimizer/src/sanity_checker.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index ed04ed48d594b..68971c51b82cb 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -945,7 +945,7 @@ fn add_roundrobin_on_top( /// distribution is satisfied by adding a Hash repartition. fn add_hash_on_top( input: DistributionContext, - satisfaction: DistributionSatisfactionResult, + satisfaction: &DistributionSatisfactionResult, n_target: usize, allow_subset_satisfy_partitioning: bool, ) -> Result { @@ -1323,7 +1323,7 @@ pub fn ensure_distribution( ) { child = add_hash_on_top( child, - satisfaction, + &satisfaction, target_partitions, allow_subset_satisfy_partitioning, )?; diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs index 29bbade540cb6..ae5fcc900c1ab 100644 --- a/datafusion/physical-optimizer/src/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -152,7 +152,7 @@ pub fn check_plan_sanity( } } - let dist_satisfaction = distribution_satisfaction(&child, &dist_req, true); + let dist_satisfaction = distribution_satisfaction(child, &dist_req, true); if !dist_satisfaction.is_satisfied() { let plan_str = get_plan_string(&plan); return plan_err!( From 0a16df1617d6e424dfc51cbc4fb9a413d1ff4c56 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 22 Dec 2025 22:20:09 +0800 Subject: [PATCH 05/18] Restore handling of satisfied-distribution Reinstate logic to skip unnecessary repartitioning for single-partition joins, window partitions, and grouped unions, matching previous behavior and maintaining regression expectations. --- .../enforce_distribution.rs | 88 ++++++++++++++++++- .../src/enforce_distribution.rs | 9 +- 2 files changed, 92 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index e725a4cb716c6..d09ad945f41e9 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -20,8 +20,8 @@ use std::ops::Deref; use std::sync::Arc; use crate::physical_optimizer::test_utils::{ - check_integrity, coalesce_partitions_exec, parquet_exec_with_sort, - parquet_exec_with_stats, repartition_exec, schema, sort_exec, + bounded_window_exec_with_partition, check_integrity, coalesce_partitions_exec, + parquet_exec_with_sort, parquet_exec_with_stats, repartition_exec, schema, sort_exec, sort_exec_with_preserve_partitioning, sort_merge_join_exec, sort_preserving_merge_exec, union_exec, }; @@ -62,15 +62,18 @@ use datafusion_physical_plan::aggregates::{ }; use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::execution_plan::ExecutionPlan; use datafusion_physical_plan::expressions::col; use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::joins::HashJoinExec; use datafusion_physical_plan::joins::utils::JoinOn; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::union::UnionExec; +use datafusion_physical_plan::windows::BoundedWindowAggExec; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlanProperties, Partitioning, PlanProperties, Statistics, displayable, @@ -3656,6 +3659,87 @@ fn distribution_satisfaction_superset_hash_matches_sanity_check() -> Result<()> ) } +#[test] +fn single_partition_join_skips_repartition() -> Result<()> { + let left = parquet_exec(); + let right = parquet_exec(); + let join_on = vec![( + Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _, + )]; + + let plan = hash_join_exec(left, right, &join_on, &JoinType::Inner); + let config = TestConfig::default().with_query_execution_partitions(16); + let optimized = config.to_plan(plan, &[Run::Distribution]); + + assert_plan!( + optimized, + @r" + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + " + ); + + Ok(()) +} + +#[test] +fn single_partition_window_partition_skips_repartition() -> Result<()> { + let schema = schema(); + let sort_exprs = vec![PhysicalSortExpr { + expr: col("a", &schema)?, + options: SortOptions::default(), + }]; + let partition_by = vec![col("b", &schema)?]; + + let window_plan = bounded_window_exec_with_partition( + "c", + sort_exprs.clone(), + &partition_by, + parquet_exec(), + ); + let config = TestConfig::default().with_query_execution_partitions(12); + let optimized = config.to_plan(window_plan, &[Run::Distribution, Run::Sorting]); + + assert_plan!( + optimized, + @r#" + BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] + SortExec: expr=[b@1 ASC NULLS LAST, a@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "# + ); + + Ok(()) +} + +#[test] +fn grouped_union_from_single_partition_skips_repartition() -> Result<()> { + let union = union_exec(vec![ + parquet_exec(), + Arc::new(EmptyExec::new(schema()).with_partitions(0)), + ]); + let aggregated = + aggregate_exec_with_alias(union, vec![("a".to_string(), "group_a".to_string())]); + let mut config = TestConfig::default().with_query_execution_partitions(10); + config.config.optimizer.enable_round_robin_repartition = false; + let optimized = config.to_plan(aggregated, &[Run::Distribution]); + + assert_plan!( + optimized, + @r" + AggregateExec: mode=FinalPartitioned, gby=[group_a@0 as group_a], aggr=[] + AggregateExec: mode=Partial, gby=[a@0 as group_a], aggr=[] + UnionExec + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + EmptyExec + " + ); + + Ok(()) +} + fn assert_hash_satisfaction_alignment( partitioning_exprs: Vec>, required_exprs: Vec>, diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 68971c51b82cb..e2ca4227619a4 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -861,9 +861,12 @@ impl DistributionSatisfactionResult { allow_subset: bool, target_partitions: usize, ) -> bool { - !self.is_satisfied() - || (!allow_subset - && target_partitions > self.output_partitioning.partition_count()) + (match self.satisfaction { + PartitioningSatisfaction::Exact => false, + PartitioningSatisfaction::Subset => !allow_subset, + PartitioningSatisfaction::NotSatisfied => true, + }) || !allow_subset + && target_partitions > self.output_partitioning.partition_count() } } From ed6e9c7fd0e5a5cb9c85b21678b4da346d80b8cf Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 23 Dec 2025 15:57:08 +0800 Subject: [PATCH 06/18] Add #18989 reproducer case --- datafusion-examples/examples/issue_18989.rs | 53 +++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 datafusion-examples/examples/issue_18989.rs diff --git a/datafusion-examples/examples/issue_18989.rs b/datafusion-examples/examples/issue_18989.rs new file mode 100644 index 0000000000000..7237261bd4b7c --- /dev/null +++ b/datafusion-examples/examples/issue_18989.rs @@ -0,0 +1,53 @@ +use std::sync::Arc; + +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::datasource::MemTable; +use datafusion::functions_aggregate::count::count_udaf; +use datafusion::logical_expr::col; +use datafusion::prelude::*; + +#[tokio::main] +async fn main() { + let ctx = SessionContext::default(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("ts", DataType::Int64, false), + Field::new("region", DataType::Utf8, false), + Field::new("value", DataType::Float64, false), + ])); + + // create an empty but multi-partitioned MemTable + let mem_table = MemTable::try_new(schema.clone(), vec![vec![], vec![]]).unwrap(); + ctx.register_table("metrics", Arc::new(mem_table)).unwrap(); + + // aggregate and sort twice + let data_frame = ctx + .table("metrics") + .await + .unwrap() + .aggregate( + vec![col("region"), col("ts")], + vec![count_udaf().call(vec![col("value")])], + ) + .unwrap() + .sort(vec![ + col("region").sort(true, true), + col("ts").sort(true, true), + ]) + .unwrap() + .aggregate( + vec![col("ts")], + vec![count_udaf().call(vec![col("count(metrics.value)")])], + ) + .unwrap() + .sort(vec![col("ts").sort(true, true)]) + .unwrap(); + + println!( + "Logical Plan:\n{}", + data_frame.logical_plan().display_indent() + ); + + data_frame.show().await.unwrap(); + println!("should not panic and print data_frame") +} From cf349003ee564b3da58de83adadca42a32d2e3ed Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 23 Dec 2025 16:00:39 +0800 Subject: [PATCH 07/18] Refactor condition in DistributionSatisfactionResult for clarity --- datafusion/physical-optimizer/src/enforce_distribution.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index e2ca4227619a4..718db8f9ab62d 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -865,8 +865,8 @@ impl DistributionSatisfactionResult { PartitioningSatisfaction::Exact => false, PartitioningSatisfaction::Subset => !allow_subset, PartitioningSatisfaction::NotSatisfied => true, - }) || !allow_subset - && target_partitions > self.output_partitioning.partition_count() + }) || (!allow_subset + && target_partitions > self.output_partitioning.partition_count()) } } From c81cac3e833ed728d340d2ccaad2ba972274859f Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 23 Dec 2025 16:01:07 +0800 Subject: [PATCH 08/18] Update test results --- .../test_files/count_star_rule.slt | 5 ++-- datafusion/sqllogictest/test_files/cte.slt | 6 +++-- .../sqllogictest/test_files/explain_tree.slt | 19 ++++++++++---- datafusion/sqllogictest/test_files/limit.slt | 4 +-- .../sqllogictest/test_files/qualify.slt | 5 ++-- .../test_files/sort_merge_join.slt | 16 +++++++----- datafusion/sqllogictest/test_files/union.slt | 26 +++++++++++-------- 7 files changed, 50 insertions(+), 31 deletions(-) diff --git a/datafusion/sqllogictest/test_files/count_star_rule.slt b/datafusion/sqllogictest/test_files/count_star_rule.slt index a1c0e6303a765..d54b8a3e6fecb 100644 --- a/datafusion/sqllogictest/test_files/count_star_rule.slt +++ b/datafusion/sqllogictest/test_files/count_star_rule.slt @@ -84,8 +84,9 @@ logical_plan physical_plan 01)ProjectionExec: expr=[a@0 as a, count(Int64(1)) PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as count_a] 02)--WindowAggExec: wdw=[count(Int64(1)) PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "count(Int64(1)) PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64 }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] -03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] -04)------DataSourceExec: partitions=1, partition_sizes=[1] +03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] +04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 +05)--------DataSourceExec: partitions=1, partition_sizes=[1] query II SELECT a, COUNT() OVER (PARTITION BY a) AS count_a FROM t1 ORDER BY a; diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index 853dbd2e24aac..a40aee97f1887 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -910,8 +910,10 @@ logical_plan 05)------TableScan: person projection=[id] physical_plan 01)HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(id@0, id@0)] -02)--DataSourceExec: partitions=1, partition_sizes=[0] -03)--DataSourceExec: partitions=1, partition_sizes=[0] +02)--RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=1 +03)----DataSourceExec: partitions=1, partition_sizes=[0] +04)--RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=1 +05)----DataSourceExec: partitions=1, partition_sizes=[0] statement count 0 drop table person; diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 9215ce87e3bef..2eceb6cce6bf5 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -1204,12 +1204,21 @@ physical_plan 09)│ c1@0 ASC ││ c1@0 ASC │ 10)└─────────────┬─────────────┘└─────────────┬─────────────┘ 11)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ -12)│ DataSourceExec ││ DataSourceExec │ +12)│ RepartitionExec ││ RepartitionExec │ 13)│ -------------------- ││ -------------------- │ -14)│ bytes: 5932 ││ bytes: 5932 │ -15)│ format: memory ││ format: memory │ -16)│ rows: 1 ││ rows: 1 │ -17)└───────────────────────────┘└───────────────────────────┘ +14)│ partition_count(in->out): ││ partition_count(in->out): │ +15)│ 1 -> 4 ││ 1 -> 4 │ +16)│ ││ │ +17)│ partitioning_scheme: ││ partitioning_scheme: │ +18)│ Hash([c1@0], 4) ││ Hash([c1@0], 4) │ +19)└─────────────┬─────────────┘└─────────────┬─────────────┘ +20)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ +21)│ DataSourceExec ││ DataSourceExec │ +22)│ -------------------- ││ -------------------- │ +23)│ bytes: 5932 ││ bytes: 5932 │ +24)│ format: memory ││ format: memory │ +25)│ rows: 1 ││ rows: 1 │ +26)└───────────────────────────┘└───────────────────────────┘ statement ok set datafusion.optimizer.prefer_hash_join = true; diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 524304546d569..8f4df2b06f040 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -404,9 +404,9 @@ logical_plan 02)--TableScan: t1000 projection=[i] physical_plan 01)AggregateExec: mode=FinalPartitioned, gby=[i@0 as i], aggr=[] -02)--RepartitionExec: partitioning=Hash([i@0], 4), input_partitions=1 +02)--RepartitionExec: partitioning=Hash([i@0], 4), input_partitions=4 03)----AggregateExec: mode=Partial, gby=[i@0 as i], aggr=[] -04)------DataSourceExec: partitions=1 +04)------DataSourceExec: partitions=4 statement ok set datafusion.explain.show_sizes = true; diff --git a/datafusion/sqllogictest/test_files/qualify.slt b/datafusion/sqllogictest/test_files/qualify.slt index ce58e3998cf57..0053646abffa2 100644 --- a/datafusion/sqllogictest/test_files/qualify.slt +++ b/datafusion/sqllogictest/test_files/qualify.slt @@ -292,8 +292,9 @@ physical_plan 02)--FilterExec: row_number() PARTITION BY [users.dept] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@0 > 1 03)----ProjectionExec: expr=[row_number() PARTITION BY [users.dept] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as row_number() PARTITION BY [users.dept] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING] 04)------BoundedWindowAggExec: wdw=[row_number() PARTITION BY [users.dept] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() PARTITION BY [users.dept] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] -05)--------SortExec: expr=[dept@0 ASC NULLS LAST], preserve_partitioning=[false] -06)----------DataSourceExec: partitions=1, partition_sizes=[1] +05)--------SortExec: expr=[dept@0 ASC NULLS LAST], preserve_partitioning=[true] +06)----------RepartitionExec: partitioning=Hash([dept@0], 4), input_partitions=1 +07)------------DataSourceExec: partitions=1, partition_sizes=[1] # plan with window function and group by query TT diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt index 5f9276bdb78ec..dacc3bbe1f98b 100644 --- a/datafusion/sqllogictest/test_files/sort_merge_join.slt +++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt @@ -38,10 +38,12 @@ logical_plan 03)--TableScan: t2 projection=[a, b] physical_plan 01)SortMergeJoin: join_type=Inner, on=[(a@0, a@0)], filter=CAST(b@1 AS Int64) * 50 <= CAST(b@0 AS Int64) -02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false] -03)----DataSourceExec: partitions=1, partition_sizes=[1] -04)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false] -05)----DataSourceExec: partitions=1, partition_sizes=[1] +02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[true] +03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 +04)------DataSourceExec: partitions=1, partition_sizes=[1] +05)--SortExec: expr=[a@0 ASC], preserve_partitioning=[true] +06)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 +07)------DataSourceExec: partitions=1, partition_sizes=[1] # inner join with join filter query TITI rowsort @@ -918,14 +920,14 @@ statement ok CREATE TABLE t3(x int) AS VALUES (1); query IIIII -SELECT * FROM t2 RIGHT JOIN t1 on t1.a = t2.a AND t1.b < t2.b +SELECT * FROM t2 RIGHT JOIN t1 on t1.a = t2.a AND t1.b < t2.b order by 1, 4, 5 ---- -NULL NULL NULL 1 100 2 250 3001 2 200 +NULL NULL NULL 1 100 NULL NULL NULL 3 300 query IIIII -SELECT * FROM t1 LEFT JOIN t2 on t1.a = t2.a AND t1.b < t2.b +SELECT * FROM t1 LEFT JOIN t2 on t1.a = t2.a AND t1.b < t2.b order by 1 ---- 1 100 NULL NULL NULL 2 200 2 250 3001 diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index b79b6d2fe5e9e..29b03c5e2506a 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -613,12 +613,14 @@ logical_plan physical_plan 01)UnionExec 02)--ProjectionExec: expr=[Int64(1)@0 as a] -03)----AggregateExec: mode=SinglePartitioned, gby=[1 as Int64(1)], aggr=[], ordering_mode=Sorted -04)------PlaceholderRowExec -05)--ProjectionExec: expr=[2 as a] -06)----PlaceholderRowExec -07)--ProjectionExec: expr=[3 as a] +03)----AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1)], aggr=[], ordering_mode=Sorted +04)------RepartitionExec: partitioning=Hash([Int64(1)@0], 4), input_partitions=1 +05)--------AggregateExec: mode=Partial, gby=[1 as Int64(1)], aggr=[], ordering_mode=Sorted +06)----------PlaceholderRowExec +07)--ProjectionExec: expr=[2 as a] 08)----PlaceholderRowExec +09)--ProjectionExec: expr=[3 as a] +10)----PlaceholderRowExec # test UNION ALL aliases correctly with aliased subquery query TT @@ -641,12 +643,14 @@ logical_plan physical_plan 01)UnionExec 02)--ProjectionExec: expr=[count(Int64(1))@1 as count, n@0 as n] -03)----AggregateExec: mode=SinglePartitioned, gby=[n@0 as n], aggr=[count(Int64(1))], ordering_mode=Sorted -04)------ProjectionExec: expr=[5 as n] -05)--------PlaceholderRowExec -06)--ProjectionExec: expr=[1 as count, max(Int64(10))@0 as n] -07)----AggregateExec: mode=Single, gby=[], aggr=[max(Int64(10))] -08)------PlaceholderRowExec +03)----AggregateExec: mode=FinalPartitioned, gby=[n@0 as n], aggr=[count(Int64(1))], ordering_mode=Sorted +04)------RepartitionExec: partitioning=Hash([n@0], 4), input_partitions=1 +05)--------AggregateExec: mode=Partial, gby=[n@0 as n], aggr=[count(Int64(1))], ordering_mode=Sorted +06)----------ProjectionExec: expr=[5 as n] +07)------------PlaceholderRowExec +08)--ProjectionExec: expr=[1 as count, max(Int64(10))@0 as n] +09)----AggregateExec: mode=Single, gby=[], aggr=[max(Int64(10))] +10)------PlaceholderRowExec # Test issue: https://github.com/apache/datafusion/issues/11409 From 49f578410288df253a55a9e982dc45fc6bdc165c Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 23 Dec 2025 16:52:05 +0800 Subject: [PATCH 09/18] Add repartition test for chained aggregates to ensure consistent hash distribution --- .../aggregate_repartition.rs | 82 ------------------- .../enforce_distribution.rs | 59 +++++++++++++ .../core/tests/physical_optimizer/mod.rs | 1 - 3 files changed, 59 insertions(+), 83 deletions(-) delete mode 100644 datafusion/core/tests/physical_optimizer/aggregate_repartition.rs diff --git a/datafusion/core/tests/physical_optimizer/aggregate_repartition.rs b/datafusion/core/tests/physical_optimizer/aggregate_repartition.rs deleted file mode 100644 index ee7bd835dd253..0000000000000 --- a/datafusion/core/tests/physical_optimizer/aggregate_repartition.rs +++ /dev/null @@ -1,82 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Regression test for ensuring repartitioning between chained aggregates. - -use std::sync::Arc; - -use arrow::datatypes::{DataType, Field, Schema}; -use arrow::record_batch::RecordBatch; -use datafusion::datasource::MemTable; -use datafusion::prelude::{SessionConfig, SessionContext}; -use datafusion_common::Result; -use datafusion_expr::col; -use datafusion_functions_aggregate::expr_fn::count; -use datafusion_physical_plan::{collect, displayable}; - -#[tokio::test] -async fn repartition_between_chained_aggregates() -> Result<()> { - // Build a two-partition, empty MemTable with the expected schema to mimic the - // reported failing plan: Sort -> Aggregate(ts, region) -> Sort -> Aggregate(ts). - let schema = Arc::new(Schema::new(vec![ - Field::new("ts", DataType::Int64, true), - Field::new("region", DataType::Utf8, true), - Field::new("value", DataType::Int64, true), - ])); - let empty_batch = RecordBatch::new_empty(schema.clone()); - let partitions = vec![vec![empty_batch.clone()], vec![empty_batch]]; - let mem_table = MemTable::try_new(schema, partitions)?; - - let config = SessionConfig::new().with_target_partitions(2); - let ctx = SessionContext::new_with_config(config); - ctx.register_table("metrics", Arc::new(mem_table))?; - - let df = ctx - .table("metrics") - .await? - .sort(vec![ - col("ts").sort(true, true), - col("region").sort(true, true), - ])? - .aggregate(vec![col("ts"), col("region")], vec![count(col("value"))])? - .sort(vec![ - col("ts").sort(true, true), - col("region").sort(true, true), - ])? - .aggregate(vec![col("ts")], vec![count(col("region"))])?; - - let physical_plan = df.create_physical_plan().await?; - - // The optimizer should either keep the stream single-partitioned via the - // sort-preserving merge, or insert a repartition between the two aggregates - // so that the second grouping sees a consistent hash distribution. Either - // path protects against the panic that was previously reported for this - // plan shape. - let plan_display = displayable(physical_plan.as_ref()).indent(true).to_string(); - let has_repartition = - plan_display.contains("RepartitionExec: partitioning=Hash([ts@0], 2)"); - assert!( - has_repartition || plan_display.contains("SortPreservingMergeExec"), - "Expected either a repartition between aggregates or a sort-preserving merge chain" - ); - - // Execute the optimized plan to ensure the empty, multi-partition pipeline does not panic. - let batches = collect(physical_plan, ctx.task_ctx()).await?; - assert!(batches.is_empty()); - - Ok(()) -} diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index d09ad945f41e9..1384ab27960a7 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -3834,3 +3834,62 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn repartition_between_chained_aggregates() -> Result<()> { + // Build a two-partition, empty MemTable with the expected schema to mimic the + // reported failing plan: Sort -> Aggregate(ts, region) -> Sort -> Aggregate(ts). + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use datafusion_expr::col; + use datafusion_functions_aggregate::expr_fn::count; + use datafusion_physical_plan::{collect, displayable}; + + let schema = Arc::new(Schema::new(vec![ + Field::new("ts", DataType::Int64, true), + Field::new("region", DataType::Utf8, true), + Field::new("value", DataType::Int64, true), + ])); + let empty_batch = RecordBatch::new_empty(schema.clone()); + let partitions = vec![vec![empty_batch.clone()], vec![empty_batch]]; + let mem_table = MemTable::try_new(schema, partitions)?; + + let config = SessionConfig::new().with_target_partitions(2); + let ctx = SessionContext::new_with_config(config); + ctx.register_table("metrics", Arc::new(mem_table))?; + + let df = ctx + .table("metrics") + .await? + .sort(vec![ + col("ts").sort(true, true), + col("region").sort(true, true), + ])? + .aggregate(vec![col("ts"), col("region")], vec![count(col("value"))])? + .sort(vec![ + col("ts").sort(true, true), + col("region").sort(true, true), + ])? + .aggregate(vec![col("ts")], vec![count(col("region"))])?; + + let physical_plan = df.create_physical_plan().await?; + + // The optimizer should either keep the stream single-partitioned via the + // sort-preserving merge, or insert a repartition between the two aggregates + // so that the second grouping sees a consistent hash distribution. Either + // path protects against the panic that was previously reported for this + // plan shape. + let plan_display = displayable(physical_plan.as_ref()).indent(true).to_string(); + let has_repartition = + plan_display.contains("RepartitionExec: partitioning=Hash([ts@0], 2)"); + assert!( + has_repartition || plan_display.contains("SortPreservingMergeExec"), + "Expected either a repartition between aggregates or a sort-preserving merge chain" + ); + + // Execute the optimized plan to ensure the empty, multi-partition pipeline does not panic. + let batches = collect(physical_plan, ctx.task_ctx()).await?; + assert!(batches.is_empty()); + + Ok(()) +} diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index e5fe0742d910e..d11322cd26be9 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -17,7 +17,6 @@ //! Physical Optimizer integration tests -mod aggregate_repartition; #[expect(clippy::needless_pass_by_value)] mod aggregate_statistics; mod combine_partial_final_agg; From b896a698872c8ea71fc3ec9c30c3d42b45c9d0de Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 23 Dec 2025 17:13:50 +0800 Subject: [PATCH 10/18] Enhance test assertions to include RepartitionExec for improved partitioning validation --- .../physical_optimizer/enforce_distribution.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 1384ab27960a7..5d7787a3acd9d 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -3676,8 +3676,10 @@ fn single_partition_join_skips_repartition() -> Result<()> { optimized, @r" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([a@0], 16), input_partitions=1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + RepartitionExec: partitioning=Hash([a@0], 16), input_partitions=1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet " ); @@ -3729,11 +3731,12 @@ fn grouped_union_from_single_partition_skips_repartition() -> Result<()> { assert_plan!( optimized, @r" - AggregateExec: mode=FinalPartitioned, gby=[group_a@0 as group_a], aggr=[] - AggregateExec: mode=Partial, gby=[a@0 as group_a], aggr=[] - UnionExec - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - EmptyExec + AggregateExec: mode=FinalPartitioned, gby=[group_a@0 as group_a], aggr=[] + RepartitionExec: partitioning=Hash([group_a@0], 10), input_partitions=1 + AggregateExec: mode=Partial, gby=[a@0 as group_a], aggr=[] + UnionExec + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + EmptyExec " ); From 5cd00f46aeac90a3fa1c56f3a200f23df1e8d35e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 23 Dec 2025 17:15:38 +0800 Subject: [PATCH 11/18] rm reproducer script --- datafusion-examples/examples/issue_18989.rs | 53 --------------------- 1 file changed, 53 deletions(-) delete mode 100644 datafusion-examples/examples/issue_18989.rs diff --git a/datafusion-examples/examples/issue_18989.rs b/datafusion-examples/examples/issue_18989.rs deleted file mode 100644 index 7237261bd4b7c..0000000000000 --- a/datafusion-examples/examples/issue_18989.rs +++ /dev/null @@ -1,53 +0,0 @@ -use std::sync::Arc; - -use datafusion::arrow::datatypes::{DataType, Field, Schema}; -use datafusion::datasource::MemTable; -use datafusion::functions_aggregate::count::count_udaf; -use datafusion::logical_expr::col; -use datafusion::prelude::*; - -#[tokio::main] -async fn main() { - let ctx = SessionContext::default(); - - let schema = Arc::new(Schema::new(vec![ - Field::new("ts", DataType::Int64, false), - Field::new("region", DataType::Utf8, false), - Field::new("value", DataType::Float64, false), - ])); - - // create an empty but multi-partitioned MemTable - let mem_table = MemTable::try_new(schema.clone(), vec![vec![], vec![]]).unwrap(); - ctx.register_table("metrics", Arc::new(mem_table)).unwrap(); - - // aggregate and sort twice - let data_frame = ctx - .table("metrics") - .await - .unwrap() - .aggregate( - vec![col("region"), col("ts")], - vec![count_udaf().call(vec![col("value")])], - ) - .unwrap() - .sort(vec![ - col("region").sort(true, true), - col("ts").sort(true, true), - ]) - .unwrap() - .aggregate( - vec![col("ts")], - vec![count_udaf().call(vec![col("count(metrics.value)")])], - ) - .unwrap() - .sort(vec![col("ts").sort(true, true)]) - .unwrap(); - - println!( - "Logical Plan:\n{}", - data_frame.logical_plan().display_indent() - ); - - data_frame.show().await.unwrap(); - println!("should not panic and print data_frame") -} From da37368ab42249f4650be8d3a10eea86f6197c13 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 23 Dec 2025 17:29:33 +0800 Subject: [PATCH 12/18] Rename test for grouped union to clarify that repartitioning is required --- .../core/tests/physical_optimizer/enforce_distribution.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 5d7787a3acd9d..4b25801d1c080 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -3717,7 +3717,7 @@ fn single_partition_window_partition_skips_repartition() -> Result<()> { } #[test] -fn grouped_union_from_single_partition_skips_repartition() -> Result<()> { +fn grouped_union_from_single_partition_requires_repartition() -> Result<()> { let union = union_exec(vec![ parquet_exec(), Arc::new(EmptyExec::new(schema()).with_partitions(0)), From acf0ea3333b5e6bd49792eaeb7af2c8cddc10f5d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 23 Dec 2025 17:42:31 +0800 Subject: [PATCH 13/18] Clarify test description for repartitioning between chained aggregates to reference issue #18989 --- .../core/tests/physical_optimizer/enforce_distribution.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 4b25801d1c080..dd0288877790f 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -3879,9 +3879,8 @@ async fn repartition_between_chained_aggregates() -> Result<()> { // The optimizer should either keep the stream single-partitioned via the // sort-preserving merge, or insert a repartition between the two aggregates - // so that the second grouping sees a consistent hash distribution. Either - // path protects against the panic that was previously reported for this - // plan shape. + // so that the second grouping sees a consistent hash distribution. + // This test is similar to the reproducer case in #18989 let plan_display = displayable(physical_plan.as_ref()).indent(true).to_string(); let has_repartition = plan_display.contains("RepartitionExec: partitioning=Hash([ts@0], 2)"); From ef7d6daba003ce03665464033f636b710bce42e3 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 23 Dec 2025 17:50:54 +0800 Subject: [PATCH 14/18] Refactor enforce_distribution and update test expectations Add necessary imports to enforce_distribution.rs and remove unused BoundedWindowAggExec import. Move the test for superset hash keys to a more logical position, ensuring consistency in testing for hash join partitioning. Update test expectations to reflect current optimizer behavior by removing unnecessary superset repartitions and replacing them with exact match requirements. Revise comments and assertions to align with the improved optimization. Delete the original enforce_distribution_superset.rs file. --- .../enforce_distribution.rs | 103 +++++++++++++-- .../enforce_distribution_superset.rs | 122 ------------------ 2 files changed, 95 insertions(+), 130 deletions(-) delete mode 100644 datafusion/core/tests/physical_optimizer/enforce_distribution_superset.rs diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index dd0288877790f..be3ad427edcf5 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -21,8 +21,8 @@ use std::sync::Arc; use crate::physical_optimizer::test_utils::{ bounded_window_exec_with_partition, check_integrity, coalesce_partitions_exec, - parquet_exec_with_sort, parquet_exec_with_stats, repartition_exec, schema, sort_exec, - sort_exec_with_preserve_partitioning, sort_merge_join_exec, + memory_exec, parquet_exec_with_sort, parquet_exec_with_stats, repartition_exec, + schema, sort_exec, sort_exec_with_preserve_partitioning, sort_merge_join_exec, sort_preserving_merge_exec, union_exec, }; @@ -37,16 +37,16 @@ use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{CsvSource, ParquetSource}; use datafusion::datasource::source::DataSourceExec; use datafusion::prelude::{SessionConfig, SessionContext}; -use datafusion_common::ScalarValue; use datafusion_common::config::CsvOptions; use datafusion_common::error::Result; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::{NullEquality, ScalarValue}; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_expr::{JoinType, Operator}; -use datafusion_physical_expr::Distribution; use datafusion_physical_expr::PartitioningSatisfaction; use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, binary, lit}; +use datafusion_physical_expr::{Distribution, Partitioning, physical_exprs_equal}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{ LexOrdering, OrderingRequirements, PhysicalSortExpr, @@ -66,17 +66,16 @@ use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::execution_plan::ExecutionPlan; use datafusion_physical_plan::expressions::col; use datafusion_physical_plan::filter::FilterExec; -use datafusion_physical_plan::joins::HashJoinExec; use datafusion_physical_plan::joins::utils::JoinOn; +use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::union::UnionExec; -use datafusion_physical_plan::windows::BoundedWindowAggExec; use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlanProperties, Partitioning, PlanProperties, - Statistics, displayable, + DisplayAs, DisplayFormatType, ExecutionPlanProperties, PlanProperties, Statistics, + displayable, }; use insta::Settings; @@ -1114,6 +1113,94 @@ fn multi_hash_join_key_ordering() -> Result<()> { Ok(()) } +#[test] +fn enforce_distribution_repartitions_superset_hash_keys() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("region", DataType::Utf8, true), + Field::new("ts", DataType::Int64, true), + ])); + let left = memory_exec(&schema); + let right = memory_exec(&schema); + + let left_superset = Arc::new(RepartitionExec::try_new( + Arc::clone(&left), + Partitioning::Hash( + vec![ + Arc::new(Column::new_with_schema("region", &schema).unwrap()), + Arc::new(Column::new_with_schema("ts", &schema).unwrap()), + ], + 4, + ), + )?); + + let right_partitioned = Arc::new(RepartitionExec::try_new( + right, + Partitioning::Hash( + vec![Arc::new(Column::new_with_schema("ts", &schema).unwrap())], + 4, + ), + )?); + + let on: JoinOn = vec![( + Arc::new(Column::new_with_schema("ts", &schema).unwrap()), + Arc::new(Column::new_with_schema("ts", &schema).unwrap()), + )]; + + let join = Arc::new(HashJoinExec::try_new( + left_superset, + right_partitioned, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + NullEquality::NullEqualsNothing, + )?); + + let mut config = ConfigOptions::new(); + config.execution.target_partitions = 4; + + let optimized = EnforceDistribution::new().optimize(join, &config)?; + let join = optimized + .as_any() + .downcast_ref::() + .expect("expected hash join"); + + // The optimizer should recognize that the left side has a superset partitioning + // (partition on both "region" and "ts") when the join only requires "ts". + // The optimizer should replace the superset repartition with one that matches + // the join requirement exactly (just "ts"). + let repartition = join + .left() + .as_any() + .downcast_ref::() + .expect("left side should be repartitioned"); + + match repartition.partitioning() { + Partitioning::Hash(exprs, partitions) => { + assert_eq!(*partitions, 4); + let expected = + vec![Arc::new(Column::new_with_schema("ts", &schema).unwrap()) as _]; + assert!( + physical_exprs_equal(exprs, &expected), + "expected repartitioning on [ts]" + ); + } + other => panic!("expected hash repartitioning, got {other:?}"), + } + + // The optimizer should have removed the unnecessary superset repartition + // and directly partitioned the source on the required keys. + // This is a better optimization than keeping the superset. + let input = repartition.input(); + assert!( + input.as_any().downcast_ref::().is_none(), + "optimizer should remove the superset repartition, not stack another on top" + ); + + Ok(()) +} + #[test] fn reorder_join_keys_to_left_input() -> Result<()> { let left = parquet_exec(); diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution_superset.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution_superset.rs deleted file mode 100644 index 2178a5e0fb742..0000000000000 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution_superset.rs +++ /dev/null @@ -1,122 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use arrow_schema::{DataType, Field, Schema, SchemaRef}; -use crate::physical_optimizer::test_utils::memory_exec; -use datafusion_common::config::ConfigOptions; -use datafusion_common::{JoinType, NullEquality, Result}; -use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::{Partitioning, physical_exprs_equal}; -use datafusion_physical_optimizer::enforce_distribution::EnforceDistribution; -use datafusion_physical_plan::joins::utils::JoinOn; -use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; -use datafusion_physical_plan::repartition::RepartitionExec; - -fn repartition_superset_schema() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("region", DataType::Utf8, true), - Field::new("ts", DataType::Int64, true), - ])) -} - -#[test] -fn enforce_distribution_repartitions_superset_hash_keys() -> Result<()> { - let schema = repartition_superset_schema(); - let left = memory_exec(&schema); - let right = memory_exec(&schema); - - let left_superset = Arc::new(RepartitionExec::try_new( - Arc::clone(&left), - Partitioning::Hash( - vec![ - Arc::new(Column::new_with_schema("region", &schema).unwrap()), - Arc::new(Column::new_with_schema("ts", &schema).unwrap()), - ], - 4, - ), - )?); - - let right_partitioned = Arc::new(RepartitionExec::try_new( - right, - Partitioning::Hash( - vec![Arc::new(Column::new_with_schema("ts", &schema).unwrap())], - 4, - ), - )?); - - let on: JoinOn = vec![ - ( - Arc::new(Column::new_with_schema("ts", &schema).unwrap()), - Arc::new(Column::new_with_schema("ts", &schema).unwrap()), - ), - ]; - - let join = Arc::new(HashJoinExec::try_new( - left_superset, - right_partitioned, - on, - None, - &JoinType::Inner, - None, - PartitionMode::Partitioned, - NullEquality::NullEqualsNothing, - )?); - - let mut config = ConfigOptions::new(); - config.execution.target_partitions = 4; - - let optimized = EnforceDistribution::new().optimize(join, &config)?; - let join = optimized - .as_any() - .downcast_ref::() - .expect("expected hash join"); - - let repartition = join - .left() - .as_any() - .downcast_ref::() - .expect("left side should be repartitioned"); - - match repartition.partitioning() { - Partitioning::Hash(exprs, partitions) => { - assert_eq!(*partitions, 4); - let expected = - vec![Arc::new(Column::new_with_schema("ts", &schema).unwrap()) as _]; - assert!( - physical_exprs_equal(exprs, &expected), - "expected repartitioning on [ts]" - ); - } - other => panic!("expected hash repartitioning, got {other:?}"), - } - - // The original (superset) partitioning should remain below the new repartition - // so we can rehash to the required keys even when the partition counts already match. - let original = repartition - .input() - .as_any() - .downcast_ref::() - .expect("expected original repartition to remain"); - assert!(matches!( - original.partitioning(), - Partitioning::Hash(_, 4) - )); - - Ok(()) -} From 6461c8eafb29cc8ef498f3f57ca50e59d70e5cc3 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 24 Dec 2025 15:32:44 +0800 Subject: [PATCH 15/18] Rename test for single partition join to clarify that repartitioning is required --- .../core/tests/physical_optimizer/enforce_distribution.rs | 2 +- datafusion/physical-optimizer/src/enforce_distribution.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index be3ad427edcf5..a3b7775c883a8 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -3747,7 +3747,7 @@ fn distribution_satisfaction_superset_hash_matches_sanity_check() -> Result<()> } #[test] -fn single_partition_join_skips_repartition() -> Result<()> { +fn single_partition_join_requires_repartition() -> Result<()> { let left = parquet_exec(); let right = parquet_exec(); let join_on = vec![( diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 718db8f9ab62d..bf44618b21f5a 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -1314,7 +1314,6 @@ pub fn ensure_distribution( child = add_merge_on_top(child); } Distribution::HashPartitioned(_) => { - // See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background let satisfaction = distribution_satisfaction( &child.plan, &requirement, From 9a734077b2a99dd92e31ab86975b4b1b5b969f88 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 24 Dec 2025 16:34:14 +0800 Subject: [PATCH 16/18] Refactor enforce_distribution tests: remove redundant tests and clarify repartitioning logic --- .../enforce_distribution.rs | 193 +----------------- 1 file changed, 11 insertions(+), 182 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index a3b7775c883a8..e787c9c511fbb 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -20,9 +20,9 @@ use std::ops::Deref; use std::sync::Arc; use crate::physical_optimizer::test_utils::{ - bounded_window_exec_with_partition, check_integrity, coalesce_partitions_exec, - memory_exec, parquet_exec_with_sort, parquet_exec_with_stats, repartition_exec, - schema, sort_exec, sort_exec_with_preserve_partitioning, sort_merge_join_exec, + check_integrity, coalesce_partitions_exec, parquet_exec_with_sort, + parquet_exec_with_stats, repartition_exec, schema, sort_exec, + sort_exec_with_preserve_partitioning, sort_merge_join_exec, sort_preserving_merge_exec, union_exec, }; @@ -37,16 +37,16 @@ use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{CsvSource, ParquetSource}; use datafusion::datasource::source::DataSourceExec; use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_common::ScalarValue; use datafusion_common::config::CsvOptions; use datafusion_common::error::Result; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{NullEquality, ScalarValue}; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_expr::{JoinType, Operator}; use datafusion_physical_expr::PartitioningSatisfaction; use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, binary, lit}; -use datafusion_physical_expr::{Distribution, Partitioning, physical_exprs_equal}; +use datafusion_physical_expr::{Distribution, Partitioning}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{ LexOrdering, OrderingRequirements, PhysicalSortExpr, @@ -62,12 +62,10 @@ use datafusion_physical_plan::aggregates::{ }; use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; -use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::execution_plan::ExecutionPlan; use datafusion_physical_plan::expressions::col; use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::joins::utils::JoinOn; -use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; @@ -1113,94 +1111,6 @@ fn multi_hash_join_key_ordering() -> Result<()> { Ok(()) } -#[test] -fn enforce_distribution_repartitions_superset_hash_keys() -> Result<()> { - let schema = Arc::new(Schema::new(vec![ - Field::new("region", DataType::Utf8, true), - Field::new("ts", DataType::Int64, true), - ])); - let left = memory_exec(&schema); - let right = memory_exec(&schema); - - let left_superset = Arc::new(RepartitionExec::try_new( - Arc::clone(&left), - Partitioning::Hash( - vec![ - Arc::new(Column::new_with_schema("region", &schema).unwrap()), - Arc::new(Column::new_with_schema("ts", &schema).unwrap()), - ], - 4, - ), - )?); - - let right_partitioned = Arc::new(RepartitionExec::try_new( - right, - Partitioning::Hash( - vec![Arc::new(Column::new_with_schema("ts", &schema).unwrap())], - 4, - ), - )?); - - let on: JoinOn = vec![( - Arc::new(Column::new_with_schema("ts", &schema).unwrap()), - Arc::new(Column::new_with_schema("ts", &schema).unwrap()), - )]; - - let join = Arc::new(HashJoinExec::try_new( - left_superset, - right_partitioned, - on, - None, - &JoinType::Inner, - None, - PartitionMode::Partitioned, - NullEquality::NullEqualsNothing, - )?); - - let mut config = ConfigOptions::new(); - config.execution.target_partitions = 4; - - let optimized = EnforceDistribution::new().optimize(join, &config)?; - let join = optimized - .as_any() - .downcast_ref::() - .expect("expected hash join"); - - // The optimizer should recognize that the left side has a superset partitioning - // (partition on both "region" and "ts") when the join only requires "ts". - // The optimizer should replace the superset repartition with one that matches - // the join requirement exactly (just "ts"). - let repartition = join - .left() - .as_any() - .downcast_ref::() - .expect("left side should be repartitioned"); - - match repartition.partitioning() { - Partitioning::Hash(exprs, partitions) => { - assert_eq!(*partitions, 4); - let expected = - vec![Arc::new(Column::new_with_schema("ts", &schema).unwrap()) as _]; - assert!( - physical_exprs_equal(exprs, &expected), - "expected repartitioning on [ts]" - ); - } - other => panic!("expected hash repartitioning, got {other:?}"), - } - - // The optimizer should have removed the unnecessary superset repartition - // and directly partitioned the source on the required keys. - // This is a better optimization than keeping the superset. - let input = repartition.input(); - assert!( - input.as_any().downcast_ref::().is_none(), - "optimizer should remove the superset repartition, not stack another on top" - ); - - Ok(()) -} - #[test] fn reorder_join_keys_to_left_input() -> Result<()> { let left = parquet_exec(); @@ -3746,90 +3656,6 @@ fn distribution_satisfaction_superset_hash_matches_sanity_check() -> Result<()> ) } -#[test] -fn single_partition_join_requires_repartition() -> Result<()> { - let left = parquet_exec(); - let right = parquet_exec(); - let join_on = vec![( - Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _, - Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _, - )]; - - let plan = hash_join_exec(left, right, &join_on, &JoinType::Inner); - let config = TestConfig::default().with_query_execution_partitions(16); - let optimized = config.to_plan(plan, &[Run::Distribution]); - - assert_plan!( - optimized, - @r" - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)] - RepartitionExec: partitioning=Hash([a@0], 16), input_partitions=1 - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - RepartitionExec: partitioning=Hash([a@0], 16), input_partitions=1 - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - " - ); - - Ok(()) -} - -#[test] -fn single_partition_window_partition_skips_repartition() -> Result<()> { - let schema = schema(); - let sort_exprs = vec![PhysicalSortExpr { - expr: col("a", &schema)?, - options: SortOptions::default(), - }]; - let partition_by = vec![col("b", &schema)?]; - - let window_plan = bounded_window_exec_with_partition( - "c", - sort_exprs.clone(), - &partition_by, - parquet_exec(), - ); - let config = TestConfig::default().with_query_execution_partitions(12); - let optimized = config.to_plan(window_plan, &[Run::Distribution, Run::Sorting]); - - assert_plan!( - optimized, - @r#" - BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] - SortExec: expr=[b@1 ASC NULLS LAST, a@0 ASC], preserve_partitioning=[false] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - "# - ); - - Ok(()) -} - -#[test] -fn grouped_union_from_single_partition_requires_repartition() -> Result<()> { - let union = union_exec(vec![ - parquet_exec(), - Arc::new(EmptyExec::new(schema()).with_partitions(0)), - ]); - let aggregated = - aggregate_exec_with_alias(union, vec![("a".to_string(), "group_a".to_string())]); - let mut config = TestConfig::default().with_query_execution_partitions(10); - config.config.optimizer.enable_round_robin_repartition = false; - let optimized = config.to_plan(aggregated, &[Run::Distribution]); - - assert_plan!( - optimized, - @r" - AggregateExec: mode=FinalPartitioned, gby=[group_a@0 as group_a], aggr=[] - RepartitionExec: partitioning=Hash([group_a@0], 10), input_partitions=1 - AggregateExec: mode=Partial, gby=[a@0 as group_a], aggr=[] - UnionExec - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet - EmptyExec - " - ); - - Ok(()) -} - fn assert_hash_satisfaction_alignment( partitioning_exprs: Vec>, required_exprs: Vec>, @@ -3927,8 +3753,12 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> { #[tokio::test] async fn repartition_between_chained_aggregates() -> Result<()> { - // Build a two-partition, empty MemTable with the expected schema to mimic the - // reported failing plan: Sort -> Aggregate(ts, region) -> Sort -> Aggregate(ts). + // Regression test for issue #18989: Sanity Check Failure in Multi-Partitioned Tables + // with Aggregations. This test ensures the optimizer properly handles chained aggregations + // with different grouping keys (first aggregation groups by [ts, region], second by [ts] only). + // The plan: Sort -> Aggregate(ts, region) -> Sort -> Aggregate(ts). + // The optimizer must either insert a repartition between aggregates or maintain a single + // partition stream to avoid distribution requirement mismatches. use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use datafusion_expr::col; @@ -3967,7 +3797,6 @@ async fn repartition_between_chained_aggregates() -> Result<()> { // The optimizer should either keep the stream single-partitioned via the // sort-preserving merge, or insert a repartition between the two aggregates // so that the second grouping sees a consistent hash distribution. - // This test is similar to the reproducer case in #18989 let plan_display = displayable(physical_plan.as_ref()).indent(true).to_string(); let has_repartition = plan_display.contains("RepartitionExec: partitioning=Hash([ts@0], 2)"); From 1ad98ec6ad6be1ccdd3239780a0b3a8a3f971996 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 24 Dec 2025 16:55:46 +0800 Subject: [PATCH 17/18] Refactor sql_interval_to_expr calls to use Self:: for consistency --- datafusion/sql/src/expr/mod.rs | 2 +- datafusion/sql/src/expr/unary_op.rs | 2 +- datafusion/sql/src/expr/value.rs | 5 ++--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index fcd7d6376d21c..77f01a016c8df 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -233,7 +233,7 @@ impl SqlToRel<'_, S> { } SQLExpr::Array(arr) => self.sql_array_literal(arr.elem, schema), - SQLExpr::Interval(interval) => self.sql_interval_to_expr(false, interval), + SQLExpr::Interval(interval) => Self::sql_interval_to_expr(false, interval), SQLExpr::Identifier(id) => { self.sql_identifier_to_expr(id, schema, planner_context) } diff --git a/datafusion/sql/src/expr/unary_op.rs b/datafusion/sql/src/expr/unary_op.rs index cd118c0fdd5c5..0d921fe0cbee5 100644 --- a/datafusion/sql/src/expr/unary_op.rs +++ b/datafusion/sql/src/expr/unary_op.rs @@ -69,7 +69,7 @@ impl SqlToRel<'_, S> { span: _, }) => self.parse_sql_number(&n, true), SQLExpr::Interval(interval) => { - self.sql_interval_to_expr(true, interval) + Self::sql_interval_to_expr(true, interval) } // Not a literal, apply negative operator on expression _ => Ok(Expr::Negative(Box::new(self.sql_expr_to_logical_expr( diff --git a/datafusion/sql/src/expr/value.rs b/datafusion/sql/src/expr/value.rs index 7808f8cb5d553..6de2d729ad9f9 100644 --- a/datafusion/sql/src/expr/value.rs +++ b/datafusion/sql/src/expr/value.rs @@ -187,7 +187,6 @@ impl SqlToRel<'_, S> { /// Convert a SQL interval expression to a DataFusion logical plan /// expression pub(super) fn sql_interval_to_expr( - &self, negative: bool, interval: Interval, ) -> Result { @@ -220,7 +219,7 @@ impl SqlToRel<'_, S> { return not_impl_err!("Unsupported interval operator: {op:?}"); } }; - let left_expr = self.sql_interval_to_expr( + let left_expr = Self::sql_interval_to_expr( negative, Interval { value: left, @@ -230,7 +229,7 @@ impl SqlToRel<'_, S> { fractional_seconds_precision: None, }, )?; - let right_expr = self.sql_interval_to_expr( + let right_expr = Self::sql_interval_to_expr( false, Interval { value: right, From d78e6a0dec3e362222cc7f430cb0a01b4b3943d8 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 24 Dec 2025 17:21:16 +0800 Subject: [PATCH 18/18] Fix: Update SortMergeJoin to SortMergeJoinExec in physical plan output --- datafusion/sqllogictest/test_files/sort_merge_join.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt index dacc3bbe1f98b..c158e21ea9f16 100644 --- a/datafusion/sqllogictest/test_files/sort_merge_join.slt +++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt @@ -37,7 +37,7 @@ logical_plan 02)--TableScan: t1 projection=[a, b] 03)--TableScan: t2 projection=[a, b] physical_plan -01)SortMergeJoin: join_type=Inner, on=[(a@0, a@0)], filter=CAST(b@1 AS Int64) * 50 <= CAST(b@0 AS Int64) +01)SortMergeJoinExec: join_type=Inner, on=[(a@0, a@0)], filter=CAST(b@1 AS Int64) * 50 <= CAST(b@0 AS Int64) 02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[true] 03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 04)------DataSourceExec: partitions=1, partition_sizes=[1]