diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 7cedaf86cb52f..f10cde9b0a71b 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -44,7 +44,9 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_expr::{JoinType, Operator}; +use datafusion_physical_expr::PartitioningSatisfaction; use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, binary, lit}; +use datafusion_physical_expr::{Distribution, Partitioning}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{ LexOrdering, OrderingRequirements, PhysicalSortExpr, @@ -52,7 +54,9 @@ use datafusion_physical_expr_common::sort_expr::{ use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_optimizer::enforce_distribution::*; use datafusion_physical_optimizer::enforce_sorting::EnforceSorting; +use datafusion_physical_optimizer::output_requirements::OutputRequirementExec; use datafusion_physical_optimizer::output_requirements::OutputRequirements; +use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; @@ -64,6 +68,7 @@ use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::joins::utils::JoinOn; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; +use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{ @@ -3609,6 +3614,76 @@ async fn test_distribute_sort_memtable() -> Result<()> { Ok(()) } +#[test] +fn distribution_satisfaction_exact_hash_matches_sanity_check() -> Result<()> { + let schema = schema(); + let col_a: Arc = Arc::new(Column::new_with_schema("a", &schema)?); + let col_b: Arc = Arc::new(Column::new_with_schema("b", &schema)?); + + assert_hash_satisfaction_alignment( + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + vec![col_a, col_b], + PartitioningSatisfaction::Exact, + false, + ) +} + +#[test] +fn distribution_satisfaction_subset_hash_matches_sanity_check() -> Result<()> { + let schema = schema(); + let col_a: Arc = Arc::new(Column::new_with_schema("a", &schema)?); + let col_b: Arc = Arc::new(Column::new_with_schema("b", &schema)?); + + assert_hash_satisfaction_alignment( + vec![Arc::clone(&col_a)], + vec![col_a, col_b], + PartitioningSatisfaction::Subset, + false, + ) +} + +#[test] +fn distribution_satisfaction_superset_hash_matches_sanity_check() -> Result<()> { + let schema = schema(); + let col_a: Arc = Arc::new(Column::new_with_schema("a", &schema)?); + let col_b: Arc = Arc::new(Column::new_with_schema("b", &schema)?); + + assert_hash_satisfaction_alignment( + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + vec![col_a], + PartitioningSatisfaction::NotSatisfied, + true, + ) +} + +fn assert_hash_satisfaction_alignment( + partitioning_exprs: Vec>, + required_exprs: Vec>, + expected: PartitioningSatisfaction, + expect_err: bool, +) -> Result<()> { + let child: Arc = Arc::new(RepartitionExec::try_new( + parquet_exec(), + Partitioning::Hash(partitioning_exprs, 4), + )?); + + let requirement = Distribution::HashPartitioned(required_exprs); + let satisfaction = distribution_satisfaction(&child, &requirement, true); + assert_eq!(satisfaction.satisfaction(), expected); + + let parent: Arc = + Arc::new(OutputRequirementExec::new(child, None, requirement, None)); + let sanity = SanityCheckPlan::new().optimize(parent, &ConfigOptions::default()); + + if expect_err { + assert!(sanity.is_err()); + } else { + sanity?; + } + + Ok(()) +} + /// Create a [`MemTable`] with 100 batches of 8192 rows each, in 1 partition fn create_memtable() -> Result { let mut batches = Vec::with_capacity(100); @@ -3675,3 +3750,64 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn repartition_between_chained_aggregates() -> Result<()> { + // Regression test for issue #18989: Sanity Check Failure in Multi-Partitioned Tables + // with Aggregations. This test ensures the optimizer properly handles chained aggregations + // with different grouping keys (first aggregation groups by [ts, region], second by [ts] only). + // The plan: Sort -> Aggregate(ts, region) -> Sort -> Aggregate(ts). + // The optimizer must either insert a repartition between aggregates or maintain a single + // partition stream to avoid distribution requirement mismatches. + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use datafusion_expr::col; + use datafusion_functions_aggregate::expr_fn::count; + use datafusion_physical_plan::{collect, displayable}; + + let schema = Arc::new(Schema::new(vec![ + Field::new("ts", DataType::Int64, true), + Field::new("region", DataType::Utf8, true), + Field::new("value", DataType::Int64, true), + ])); + let empty_batch = RecordBatch::new_empty(schema.clone()); + let partitions = vec![vec![empty_batch.clone()], vec![empty_batch]]; + let mem_table = MemTable::try_new(schema, partitions)?; + + let config = SessionConfig::new().with_target_partitions(2); + let ctx = SessionContext::new_with_config(config); + ctx.register_table("metrics", Arc::new(mem_table))?; + + let df = ctx + .table("metrics") + .await? + .sort(vec![ + col("ts").sort(true, true), + col("region").sort(true, true), + ])? + .aggregate(vec![col("ts"), col("region")], vec![count(col("value"))])? + .sort(vec![ + col("ts").sort(true, true), + col("region").sort(true, true), + ])? + .aggregate(vec![col("ts")], vec![count(col("region"))])?; + + let physical_plan = df.create_physical_plan().await?; + + // The optimizer should either keep the stream single-partitioned via the + // sort-preserving merge, or insert a repartition between the two aggregates + // so that the second grouping sees a consistent hash distribution. + let plan_display = displayable(physical_plan.as_ref()).indent(true).to_string(); + let has_repartition = + plan_display.contains("RepartitionExec: partitioning=Hash([ts@0], 2)"); + assert!( + has_repartition || plan_display.contains("SortPreservingMergeExec"), + "Expected either a repartition between aggregates or a sort-preserving merge chain" + ); + + // Execute the optimized plan to ensure the empty, multi-partition pipeline does not panic. + let batches = collect(physical_plan, ctx.task_ctx()).await?; + assert!(batches.is_empty()); + + Ok(()) +} diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 988e14c28e17c..8aab3fd2ff0a6 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -58,7 +58,7 @@ pub use analysis::{AnalysisContext, ExprBoundaries, analyze}; pub use equivalence::{ AcrossPartitions, ConstExpr, EquivalenceProperties, calculate_union, }; -pub use partitioning::{Distribution, Partitioning}; +pub use partitioning::{Distribution, Partitioning, PartitioningSatisfaction}; pub use physical_expr::{ add_offset_to_expr, add_offset_to_physical_sort_exprs, create_lex_ordering, create_ordering, create_physical_sort_expr, create_physical_sort_exprs, diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 6120e1f3b5826..bf44618b21f5a 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -40,7 +40,8 @@ use datafusion_expr::logical_plan::JoinType; use datafusion_physical_expr::expressions::{Column, NoOp}; use datafusion_physical_expr::utils::map_columns_before_projection; use datafusion_physical_expr::{ - EquivalenceProperties, PhysicalExpr, PhysicalExprRef, physical_exprs_equal, + EquivalenceProperties, PartitioningSatisfaction, PhysicalExpr, PhysicalExprRef, + physical_exprs_equal, }; use datafusion_physical_plan::ExecutionPlanProperties; use datafusion_physical_plan::aggregates::{ @@ -831,6 +832,63 @@ fn new_join_conditions( .collect() } +#[derive(Debug, Clone)] +pub struct DistributionSatisfactionResult { + requirement: Distribution, + output_partitioning: Partitioning, + satisfaction: PartitioningSatisfaction, +} + +impl DistributionSatisfactionResult { + pub fn is_satisfied(&self) -> bool { + self.satisfaction.is_satisfied() + } + + pub fn satisfaction(&self) -> PartitioningSatisfaction { + self.satisfaction + } + + pub fn output_partitioning(&self) -> &Partitioning { + &self.output_partitioning + } + + pub fn required_distribution(&self) -> &Distribution { + &self.requirement + } + + pub fn requires_repartition( + &self, + allow_subset: bool, + target_partitions: usize, + ) -> bool { + (match self.satisfaction { + PartitioningSatisfaction::Exact => false, + PartitioningSatisfaction::Subset => !allow_subset, + PartitioningSatisfaction::NotSatisfied => true, + }) || (!allow_subset + && target_partitions > self.output_partitioning.partition_count()) + } +} + +pub fn distribution_satisfaction( + plan: &Arc, + requirement: &Distribution, + allow_subset: bool, +) -> DistributionSatisfactionResult { + let output_partitioning = plan.output_partitioning().clone(); + let satisfaction = output_partitioning.satisfaction( + requirement, + plan.equivalence_properties(), + allow_subset, + ); + + DistributionSatisfactionResult { + requirement: requirement.clone(), + output_partitioning, + satisfaction, + } +} + /// Adds RoundRobin repartition operator to the plan increase parallelism. /// /// # Arguments @@ -890,7 +948,7 @@ fn add_roundrobin_on_top( /// distribution is satisfied by adding a Hash repartition. fn add_hash_on_top( input: DistributionContext, - hash_exprs: Vec>, + satisfaction: &DistributionSatisfactionResult, n_target: usize, allow_subset_satisfy_partitioning: bool, ) -> Result { @@ -900,22 +958,11 @@ fn add_hash_on_top( return Ok(input); } - let dist = Distribution::HashPartitioned(hash_exprs); - let satisfaction = input.plan.output_partitioning().satisfaction( - &dist, - input.plan.equivalence_properties(), - allow_subset_satisfy_partitioning, - ); - // Add hash repartitioning when: // - When subset satisfaction is enabled (current >= threshold): only repartition if not satisfied // - When below threshold (current < threshold): repartition if expressions don't match OR to increase parallelism - let needs_repartition = if allow_subset_satisfy_partitioning { - !satisfaction.is_satisfied() - } else { - !satisfaction.is_satisfied() - || n_target > input.plan.output_partitioning().partition_count() - }; + let needs_repartition = + satisfaction.requires_repartition(allow_subset_satisfy_partitioning, n_target); if needs_repartition { // When there is an existing ordering, we preserve ordering during @@ -925,7 +972,10 @@ fn add_hash_on_top( // requirements. // - Usage of order preserving variants is not desirable (per the flag // `config.optimizer.prefer_existing_sort`). - let partitioning = dist.create_partitioning(n_target); + let partitioning = satisfaction + .required_distribution() + .clone() + .create_partitioning(n_target); let repartition = RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)? .with_preserve_order(); @@ -1070,94 +1120,37 @@ struct RepartitionRequirementStatus { /// Designates whether round robin partitioning is beneficial according to /// the statistical information we have on the number of rows. roundrobin_beneficial_stats: bool, - /// Designates whether hash partitioning is necessary. - hash_necessary: bool, } -/// Calculates the `RepartitionRequirementStatus` for each children to generate +/// Calculates the `RepartitionRequirementStatus` for each child to generate /// consistent and sensible (in terms of performance) distribution requirements. -/// As an example, a hash join's left (build) child might produce -/// -/// ```text -/// RepartitionRequirementStatus { -/// .., -/// hash_necessary: true -/// } -/// ``` -/// -/// while its right (probe) child might have very few rows and produce: -/// -/// ```text -/// RepartitionRequirementStatus { -/// .., -/// hash_necessary: false -/// } -/// ``` -/// -/// These statuses are not consistent as all children should agree on hash -/// partitioning. This function aligns the statuses to generate consistent -/// hash partitions for each children. After alignment, the right child's -/// status would turn into: -/// -/// ```text -/// RepartitionRequirementStatus { -/// .., -/// hash_necessary: true -/// } -/// ``` fn get_repartition_requirement_status( plan: &Arc, batch_size: usize, should_use_estimates: bool, ) -> Result> { - let mut needs_alignment = false; let children = plan.children(); let rr_beneficial = plan.benefits_from_input_partitioning(); let requirements = plan.required_input_distribution(); - let mut repartition_status_flags = vec![]; - for (child, requirement, roundrobin_beneficial) in - izip!(children.into_iter(), requirements, rr_beneficial) - { - // Decide whether adding a round robin is beneficial depending on - // the statistical information we have on the number of rows: - let roundrobin_beneficial_stats = match child.partition_statistics(None)?.num_rows - { - Precision::Exact(n_rows) => n_rows > batch_size, - Precision::Inexact(n_rows) => !should_use_estimates || (n_rows > batch_size), - Precision::Absent => true, - }; - let is_hash = matches!(requirement, Distribution::HashPartitioned(_)); - // Hash re-partitioning is necessary when the input has more than one - // partitions: - let multi_partitions = child.output_partitioning().partition_count() > 1; - let roundrobin_sensible = roundrobin_beneficial && roundrobin_beneficial_stats; - needs_alignment |= is_hash && (multi_partitions || roundrobin_sensible); - repartition_status_flags.push(( - is_hash, - RepartitionRequirementStatus { + izip!(children.into_iter(), requirements, rr_beneficial) + .map(|(child, requirement, roundrobin_beneficial)| { + // Decide whether adding a round robin is beneficial depending on + // the statistical information we have on the number of rows: + let roundrobin_beneficial_stats = + match child.partition_statistics(None)?.num_rows { + Precision::Exact(n_rows) => n_rows > batch_size, + Precision::Inexact(n_rows) => { + !should_use_estimates || (n_rows > batch_size) + } + Precision::Absent => true, + }; + Ok(RepartitionRequirementStatus { requirement, roundrobin_beneficial, roundrobin_beneficial_stats, - hash_necessary: is_hash && multi_partitions, - }, - )); - } - // Align hash necessary flags for hash partitions to generate consistent - // hash partitions at each children: - if needs_alignment { - // When there is at least one hash requirement that is necessary or - // beneficial according to statistics, make all children require hash - // repartitioning: - for (is_hash, status) in &mut repartition_status_flags { - if *is_hash { - status.hash_necessary = true; - } - } - } - Ok(repartition_status_flags - .into_iter() - .map(|(_, status)| status) - .collect()) + }) + }) + .collect() } /// This function checks whether we need to add additional data exchange @@ -1281,7 +1274,6 @@ pub fn ensure_distribution( requirement, roundrobin_beneficial, roundrobin_beneficial_stats, - hash_necessary, }, )| { let increases_partition_count = @@ -1321,13 +1313,19 @@ pub fn ensure_distribution( Distribution::SinglePartition => { child = add_merge_on_top(child); } - Distribution::HashPartitioned(exprs) => { - // See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background - // When inserting hash is necessary to satisfy hash requirement, insert hash repartition. - if hash_necessary { + Distribution::HashPartitioned(_) => { + let satisfaction = distribution_satisfaction( + &child.plan, + &requirement, + allow_subset_satisfy_partitioning, + ); + if satisfaction.requires_repartition( + allow_subset_satisfy_partitioning, + target_partitions, + ) { child = add_hash_on_top( child, - exprs.to_vec(), + &satisfaction, target_partitions, allow_subset_satisfy_partitioning, )?; diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs index bff33a281556d..ae5fcc900c1ab 100644 --- a/datafusion/physical-optimizer/src/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -35,6 +35,7 @@ use datafusion_physical_plan::joins::SymmetricHashJoinExec; use datafusion_physical_plan::{ExecutionPlanProperties, get_plan_string}; use crate::PhysicalOptimizerRule; +use crate::enforce_distribution::distribution_satisfaction; use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list; use itertools::izip; @@ -151,18 +152,15 @@ pub fn check_plan_sanity( } } - if !child - .output_partitioning() - .satisfaction(&dist_req, child_eq_props, true) - .is_satisfied() - { + let dist_satisfaction = distribution_satisfaction(child, &dist_req, true); + if !dist_satisfaction.is_satisfied() { let plan_str = get_plan_string(&plan); return plan_err!( "Plan: {:?} does not satisfy distribution requirements: {}. Child-{} output partitioning: {}", plan_str, dist_req, idx, - child.output_partitioning() + dist_satisfaction.output_partitioning() ); } } diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index fcd7d6376d21c..77f01a016c8df 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -233,7 +233,7 @@ impl SqlToRel<'_, S> { } SQLExpr::Array(arr) => self.sql_array_literal(arr.elem, schema), - SQLExpr::Interval(interval) => self.sql_interval_to_expr(false, interval), + SQLExpr::Interval(interval) => Self::sql_interval_to_expr(false, interval), SQLExpr::Identifier(id) => { self.sql_identifier_to_expr(id, schema, planner_context) } diff --git a/datafusion/sql/src/expr/unary_op.rs b/datafusion/sql/src/expr/unary_op.rs index cd118c0fdd5c5..0d921fe0cbee5 100644 --- a/datafusion/sql/src/expr/unary_op.rs +++ b/datafusion/sql/src/expr/unary_op.rs @@ -69,7 +69,7 @@ impl SqlToRel<'_, S> { span: _, }) => self.parse_sql_number(&n, true), SQLExpr::Interval(interval) => { - self.sql_interval_to_expr(true, interval) + Self::sql_interval_to_expr(true, interval) } // Not a literal, apply negative operator on expression _ => Ok(Expr::Negative(Box::new(self.sql_expr_to_logical_expr( diff --git a/datafusion/sql/src/expr/value.rs b/datafusion/sql/src/expr/value.rs index 7808f8cb5d553..6de2d729ad9f9 100644 --- a/datafusion/sql/src/expr/value.rs +++ b/datafusion/sql/src/expr/value.rs @@ -187,7 +187,6 @@ impl SqlToRel<'_, S> { /// Convert a SQL interval expression to a DataFusion logical plan /// expression pub(super) fn sql_interval_to_expr( - &self, negative: bool, interval: Interval, ) -> Result { @@ -220,7 +219,7 @@ impl SqlToRel<'_, S> { return not_impl_err!("Unsupported interval operator: {op:?}"); } }; - let left_expr = self.sql_interval_to_expr( + let left_expr = Self::sql_interval_to_expr( negative, Interval { value: left, @@ -230,7 +229,7 @@ impl SqlToRel<'_, S> { fractional_seconds_precision: None, }, )?; - let right_expr = self.sql_interval_to_expr( + let right_expr = Self::sql_interval_to_expr( false, Interval { value: right, diff --git a/datafusion/sqllogictest/test_files/count_star_rule.slt b/datafusion/sqllogictest/test_files/count_star_rule.slt index a1c0e6303a765..d54b8a3e6fecb 100644 --- a/datafusion/sqllogictest/test_files/count_star_rule.slt +++ b/datafusion/sqllogictest/test_files/count_star_rule.slt @@ -84,8 +84,9 @@ logical_plan physical_plan 01)ProjectionExec: expr=[a@0 as a, count(Int64(1)) PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as count_a] 02)--WindowAggExec: wdw=[count(Int64(1)) PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "count(Int64(1)) PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64 }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] -03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] -04)------DataSourceExec: partitions=1, partition_sizes=[1] +03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] +04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 +05)--------DataSourceExec: partitions=1, partition_sizes=[1] query II SELECT a, COUNT() OVER (PARTITION BY a) AS count_a FROM t1 ORDER BY a; diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index 853dbd2e24aac..a40aee97f1887 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -910,8 +910,10 @@ logical_plan 05)------TableScan: person projection=[id] physical_plan 01)HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(id@0, id@0)] -02)--DataSourceExec: partitions=1, partition_sizes=[0] -03)--DataSourceExec: partitions=1, partition_sizes=[0] +02)--RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=1 +03)----DataSourceExec: partitions=1, partition_sizes=[0] +04)--RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=1 +05)----DataSourceExec: partitions=1, partition_sizes=[0] statement count 0 drop table person; diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 9215ce87e3bef..2eceb6cce6bf5 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -1204,12 +1204,21 @@ physical_plan 09)│ c1@0 ASC ││ c1@0 ASC │ 10)└─────────────┬─────────────┘└─────────────┬─────────────┘ 11)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ -12)│ DataSourceExec ││ DataSourceExec │ +12)│ RepartitionExec ││ RepartitionExec │ 13)│ -------------------- ││ -------------------- │ -14)│ bytes: 5932 ││ bytes: 5932 │ -15)│ format: memory ││ format: memory │ -16)│ rows: 1 ││ rows: 1 │ -17)└───────────────────────────┘└───────────────────────────┘ +14)│ partition_count(in->out): ││ partition_count(in->out): │ +15)│ 1 -> 4 ││ 1 -> 4 │ +16)│ ││ │ +17)│ partitioning_scheme: ││ partitioning_scheme: │ +18)│ Hash([c1@0], 4) ││ Hash([c1@0], 4) │ +19)└─────────────┬─────────────┘└─────────────┬─────────────┘ +20)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ +21)│ DataSourceExec ││ DataSourceExec │ +22)│ -------------------- ││ -------------------- │ +23)│ bytes: 5932 ││ bytes: 5932 │ +24)│ format: memory ││ format: memory │ +25)│ rows: 1 ││ rows: 1 │ +26)└───────────────────────────┘└───────────────────────────┘ statement ok set datafusion.optimizer.prefer_hash_join = true; diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 524304546d569..8f4df2b06f040 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -404,9 +404,9 @@ logical_plan 02)--TableScan: t1000 projection=[i] physical_plan 01)AggregateExec: mode=FinalPartitioned, gby=[i@0 as i], aggr=[] -02)--RepartitionExec: partitioning=Hash([i@0], 4), input_partitions=1 +02)--RepartitionExec: partitioning=Hash([i@0], 4), input_partitions=4 03)----AggregateExec: mode=Partial, gby=[i@0 as i], aggr=[] -04)------DataSourceExec: partitions=1 +04)------DataSourceExec: partitions=4 statement ok set datafusion.explain.show_sizes = true; diff --git a/datafusion/sqllogictest/test_files/qualify.slt b/datafusion/sqllogictest/test_files/qualify.slt index ce58e3998cf57..0053646abffa2 100644 --- a/datafusion/sqllogictest/test_files/qualify.slt +++ b/datafusion/sqllogictest/test_files/qualify.slt @@ -292,8 +292,9 @@ physical_plan 02)--FilterExec: row_number() PARTITION BY [users.dept] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@0 > 1 03)----ProjectionExec: expr=[row_number() PARTITION BY [users.dept] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as row_number() PARTITION BY [users.dept] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING] 04)------BoundedWindowAggExec: wdw=[row_number() PARTITION BY [users.dept] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() PARTITION BY [users.dept] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] -05)--------SortExec: expr=[dept@0 ASC NULLS LAST], preserve_partitioning=[false] -06)----------DataSourceExec: partitions=1, partition_sizes=[1] +05)--------SortExec: expr=[dept@0 ASC NULLS LAST], preserve_partitioning=[true] +06)----------RepartitionExec: partitioning=Hash([dept@0], 4), input_partitions=1 +07)------------DataSourceExec: partitions=1, partition_sizes=[1] # plan with window function and group by query TT diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt index d2fa37ef76da8..c158e21ea9f16 100644 --- a/datafusion/sqllogictest/test_files/sort_merge_join.slt +++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt @@ -38,10 +38,12 @@ logical_plan 03)--TableScan: t2 projection=[a, b] physical_plan 01)SortMergeJoinExec: join_type=Inner, on=[(a@0, a@0)], filter=CAST(b@1 AS Int64) * 50 <= CAST(b@0 AS Int64) -02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false] -03)----DataSourceExec: partitions=1, partition_sizes=[1] -04)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false] -05)----DataSourceExec: partitions=1, partition_sizes=[1] +02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[true] +03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 +04)------DataSourceExec: partitions=1, partition_sizes=[1] +05)--SortExec: expr=[a@0 ASC], preserve_partitioning=[true] +06)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 +07)------DataSourceExec: partitions=1, partition_sizes=[1] # inner join with join filter query TITI rowsort @@ -918,14 +920,14 @@ statement ok CREATE TABLE t3(x int) AS VALUES (1); query IIIII -SELECT * FROM t2 RIGHT JOIN t1 on t1.a = t2.a AND t1.b < t2.b +SELECT * FROM t2 RIGHT JOIN t1 on t1.a = t2.a AND t1.b < t2.b order by 1, 4, 5 ---- -NULL NULL NULL 1 100 2 250 3001 2 200 +NULL NULL NULL 1 100 NULL NULL NULL 3 300 query IIIII -SELECT * FROM t1 LEFT JOIN t2 on t1.a = t2.a AND t1.b < t2.b +SELECT * FROM t1 LEFT JOIN t2 on t1.a = t2.a AND t1.b < t2.b order by 1 ---- 1 100 NULL NULL NULL 2 200 2 250 3001 diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index b79b6d2fe5e9e..29b03c5e2506a 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -613,12 +613,14 @@ logical_plan physical_plan 01)UnionExec 02)--ProjectionExec: expr=[Int64(1)@0 as a] -03)----AggregateExec: mode=SinglePartitioned, gby=[1 as Int64(1)], aggr=[], ordering_mode=Sorted -04)------PlaceholderRowExec -05)--ProjectionExec: expr=[2 as a] -06)----PlaceholderRowExec -07)--ProjectionExec: expr=[3 as a] +03)----AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1)], aggr=[], ordering_mode=Sorted +04)------RepartitionExec: partitioning=Hash([Int64(1)@0], 4), input_partitions=1 +05)--------AggregateExec: mode=Partial, gby=[1 as Int64(1)], aggr=[], ordering_mode=Sorted +06)----------PlaceholderRowExec +07)--ProjectionExec: expr=[2 as a] 08)----PlaceholderRowExec +09)--ProjectionExec: expr=[3 as a] +10)----PlaceholderRowExec # test UNION ALL aliases correctly with aliased subquery query TT @@ -641,12 +643,14 @@ logical_plan physical_plan 01)UnionExec 02)--ProjectionExec: expr=[count(Int64(1))@1 as count, n@0 as n] -03)----AggregateExec: mode=SinglePartitioned, gby=[n@0 as n], aggr=[count(Int64(1))], ordering_mode=Sorted -04)------ProjectionExec: expr=[5 as n] -05)--------PlaceholderRowExec -06)--ProjectionExec: expr=[1 as count, max(Int64(10))@0 as n] -07)----AggregateExec: mode=Single, gby=[], aggr=[max(Int64(10))] -08)------PlaceholderRowExec +03)----AggregateExec: mode=FinalPartitioned, gby=[n@0 as n], aggr=[count(Int64(1))], ordering_mode=Sorted +04)------RepartitionExec: partitioning=Hash([n@0], 4), input_partitions=1 +05)--------AggregateExec: mode=Partial, gby=[n@0 as n], aggr=[count(Int64(1))], ordering_mode=Sorted +06)----------ProjectionExec: expr=[5 as n] +07)------------PlaceholderRowExec +08)--ProjectionExec: expr=[1 as count, max(Int64(10))@0 as n] +09)----AggregateExec: mode=Single, gby=[], aggr=[max(Int64(10))] +10)------PlaceholderRowExec # Test issue: https://github.com/apache/datafusion/issues/11409