From ee13b64003963e038ce060d942bc19a5a149af51 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 Jan 2025 12:28:49 +0000 Subject: [PATCH 1/8] Add sum statistic --- datafusion/common/src/stats.rs | 85 ++++++++-- datafusion/core/src/datasource/statistics.rs | 9 +- .../tests/custom_sources_cases/statistics.rs | 16 +- datafusion/physical-plan/src/common.rs | 3 + datafusion/physical-plan/src/filter.rs | 33 ++-- .../physical-plan/src/joins/cross_join.rs | 87 +++++++---- datafusion/physical-plan/src/joins/utils.rs | 145 ++++++++---------- datafusion/physical-plan/src/projection.rs | 7 + datafusion/physical-plan/src/union.rs | 50 +++--- datafusion/physical-plan/src/values.rs | 14 +- .../proto/datafusion_common.proto | 11 +- datafusion/proto-common/src/from_proto/mod.rs | 8 +- .../proto-common/src/generated/pbjson.rs | 54 +++---- .../proto-common/src/generated/prost.rs | 16 +- datafusion/proto-common/src/to_proto/mod.rs | 3 +- .../src/generated/datafusion_proto_common.rs | 16 +- 16 files changed, 306 insertions(+), 251 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index d2ce965c5c49..3c7ed0af1a1b 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -21,7 +21,7 @@ use std::fmt::{self, Debug, Display}; use crate::{Result, ScalarValue}; -use arrow_schema::{Schema, SchemaRef}; +use arrow_schema::{DataType, Schema, SchemaRef}; /// Represents a value with a degree of certainty. `Precision` is used to /// propagate information the precision of statistical values. @@ -170,24 +170,63 @@ impl Precision { pub fn add(&self, other: &Precision) -> Precision { match (self, other) { (Precision::Exact(a), Precision::Exact(b)) => { - if let Ok(result) = a.add(b) { - Precision::Exact(result) - } else { - Precision::Absent - } + a.add(b).map(Precision::Exact).unwrap_or(Precision::Absent) } (Precision::Inexact(a), Precision::Exact(b)) | (Precision::Exact(a), Precision::Inexact(b)) - | (Precision::Inexact(a), Precision::Inexact(b)) => { - if let Ok(result) = a.add(b) { - Precision::Inexact(result) - } else { - Precision::Absent - } + | (Precision::Inexact(a), Precision::Inexact(b)) => a + .add(b) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent), + (_, _) => Precision::Absent, + } + } + + /// Calculates the difference of two (possibly inexact) [`ScalarValue`] values, + /// conservatively propagating exactness information. If one of the input + /// values is [`Precision::Absent`], the result is `Absent` too. + pub fn sub(&self, other: &Precision) -> Precision { + match (self, other) { + (Precision::Exact(a), Precision::Exact(b)) => { + a.add(b).map(Precision::Exact).unwrap_or(Precision::Absent) } + (Precision::Inexact(a), Precision::Exact(b)) + | (Precision::Exact(a), Precision::Inexact(b)) + | (Precision::Inexact(a), Precision::Inexact(b)) => a + .add(b) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent), + (_, _) => Precision::Absent, + } + } + + /// Calculates the multiplication of two (possibly inexact) [`ScalarValue`] values, + /// conservatively propagating exactness information. If one of the input + /// values is [`Precision::Absent`], the result is `Absent` too. + pub fn multiply(&self, other: &Precision) -> Precision { + match (self, other) { + (Precision::Exact(a), Precision::Exact(b)) => a + .mul_checked(b) + .map(Precision::Exact) + .unwrap_or(Precision::Absent), + (Precision::Inexact(a), Precision::Exact(b)) + | (Precision::Exact(a), Precision::Inexact(b)) + | (Precision::Inexact(a), Precision::Inexact(b)) => a + .mul_checked(b) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent), (_, _) => Precision::Absent, } } + + /// Casts the value to the given data type, propagating exactness information. + pub fn cast_to(&self, data_type: &DataType) -> Result> { + match self { + Precision::Exact(value) => value.cast_to(data_type).map(Precision::Exact), + Precision::Inexact(value) => value.cast_to(data_type).map(Precision::Inexact), + Precision::Absent => Ok(Precision::Absent), + } + } } impl Debug for Precision { @@ -210,6 +249,18 @@ impl Display for Precision { } } +impl From> for Precision { + fn from(value: Precision) -> Self { + match value { + Precision::Exact(v) => Precision::Exact(ScalarValue::UInt64(Some(v as u64))), + Precision::Inexact(v) => { + Precision::Inexact(ScalarValue::UInt64(Some(v as u64))) + } + Precision::Absent => Precision::Absent, + } + } +} + /// Statistics for a relation /// Fields are optional and can be inexact because the sources /// sometimes provide approximate estimates for performance reasons @@ -401,6 +452,11 @@ impl Display for Statistics { } else { s }; + let s = if cs.sum_value != Precision::Absent { + format!("{} Sum={}", s, cs.sum_value) + } else { + s + }; let s = if cs.null_count != Precision::Absent { format!("{} Null={}", s, cs.null_count) } else { @@ -436,6 +492,8 @@ pub struct ColumnStatistics { pub max_value: Precision, /// Minimum value of column pub min_value: Precision, + /// Sum value of a column + pub sum_value: Precision, /// Number of distinct values pub distinct_count: Precision, } @@ -458,6 +516,7 @@ impl ColumnStatistics { null_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, distinct_count: Precision::Absent, } } @@ -469,6 +528,7 @@ impl ColumnStatistics { self.null_count = self.null_count.to_inexact(); self.max_value = self.max_value.to_inexact(); self.min_value = self.min_value.to_inexact(); + self.sum_value = self.sum_value.to_inexact(); self.distinct_count = self.distinct_count.to_inexact(); self } @@ -646,6 +706,7 @@ mod tests { null_count: Precision::Exact(null_count), max_value: Precision::Exact(ScalarValue::Int64(Some(42))), min_value: Precision::Exact(ScalarValue::Int64(Some(64))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(4600))), distinct_count: Precision::Exact(100), } } diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index 201bbfd5c007..f81e7bb916de 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -73,9 +73,7 @@ pub async fn get_statistics_with_limit( for (index, file_column) in file_stats.column_statistics.clone().into_iter().enumerate() { - col_stats_set[index].null_count = file_column.null_count; - col_stats_set[index].max_value = file_column.max_value; - col_stats_set[index].min_value = file_column.min_value; + col_stats_set[index] = file_column; } // If the number of rows exceeds the limit, we can stop processing @@ -113,12 +111,14 @@ pub async fn get_statistics_with_limit( null_count: file_nc, max_value: file_max, min_value: file_min, + sum_value: file_sum, distinct_count: _, } = file_col_stats; col_stats.null_count = add_row_stats(*file_nc, col_stats.null_count); set_max_if_greater(file_max, &mut col_stats.max_value); - set_min_if_lesser(file_min, &mut col_stats.min_value) + set_min_if_lesser(file_min, &mut col_stats.min_value); + col_stats.sum_value = file_sum.add(&col_stats.sum_value); } // If the number of rows exceeds the limit, we can stop processing @@ -204,6 +204,7 @@ pub(crate) fn get_col_stats( null_count: null_counts[i], max_value: max_value.map(Precision::Exact).unwrap_or(Precision::Absent), min_value: min_value.map(Precision::Exact).unwrap_or(Precision::Absent), + sum_value: Precision::Absent, distinct_count: Precision::Absent, } }) diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 9d3bd594a929..b937b505bbda 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -26,18 +26,17 @@ use datafusion::{ error::Result, logical_expr::Expr, physical_plan::{ - ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, - PlanProperties, SendableRecordBatchStream, Statistics, + ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, + Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, }, prelude::SessionContext, scalar::ScalarValue, }; -use datafusion_catalog::Session; use datafusion_common::{project_schema, stats::Precision}; use datafusion_physical_expr::EquivalenceProperties; -use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use async_trait::async_trait; +use datafusion_catalog::Session; /// This is a testing structure for statistics /// It will act both as a table provider and execution plan @@ -65,11 +64,12 @@ impl StatisticsValidation { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { + let eq_properties = EquivalenceProperties::new(schema); + PlanProperties::new( - EquivalenceProperties::new(schema), + eq_properties, Partitioning::UnknownPartitioning(2), - EmissionType::Incremental, - Boundedness::Bounded, + ExecutionMode::Bounded, ) } } @@ -200,12 +200,14 @@ fn fully_defined() -> (Statistics, Schema) { distinct_count: Precision::Exact(2), max_value: Precision::Exact(ScalarValue::Int32(Some(1023))), min_value: Precision::Exact(ScalarValue::Int32(Some(-24))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(10))), null_count: Precision::Exact(0), }, ColumnStatistics { distinct_count: Precision::Exact(13), max_value: Precision::Exact(ScalarValue::Int64(Some(5486))), min_value: Precision::Exact(ScalarValue::Int64(Some(-6783))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(10))), null_count: Precision::Exact(5), }, ], diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index aefb90d1d1b7..20a4e89dba94 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -333,12 +333,14 @@ mod tests { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, null_count: Precision::Exact(0), }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, null_count: Precision::Exact(0), }, ], @@ -371,6 +373,7 @@ mod tests { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, null_count: Precision::Exact(3), }], }; diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 8e7c14f0baed..f15336f0edb1 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -45,8 +45,7 @@ use datafusion_physical_expr::expressions::BinaryExpr; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ - analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, - ExprBoundaries, PhysicalExpr, + analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, }; use crate::execution_plan::CardinalityEffect; @@ -219,23 +218,13 @@ impl FilterExec { if binary.op() == &Operator::Eq { // Filter evaluates to single value for all partitions if input_eqs.is_expr_constant(binary.left()) { - let (expr, across_parts) = ( - binary.right(), - input_eqs.get_expr_constant_value(binary.right()), - ); res_constants.push( - ConstExpr::new(Arc::clone(expr)) - .with_across_partitions(across_parts), - ); + ConstExpr::from(binary.right()).with_across_partitions(true), + ) } else if input_eqs.is_expr_constant(binary.right()) { - let (expr, across_parts) = ( - binary.left(), - input_eqs.get_expr_constant_value(binary.left()), - ); res_constants.push( - ConstExpr::new(Arc::clone(expr)) - .with_across_partitions(across_parts), - ); + ConstExpr::from(binary.left()).with_across_partitions(true), + ) } } } @@ -263,12 +252,8 @@ impl FilterExec { .into_iter() .filter(|column| stats.column_statistics[column.index()].is_singleton()) .map(|column| { - let value = stats.column_statistics[column.index()] - .min_value - .get_value(); let expr = Arc::new(column) as _; - ConstExpr::new(expr) - .with_across_partitions(AcrossPartitions::Uniform(value.cloned())) + ConstExpr::new(expr).with_across_partitions(true) }); // This is for statistics eq_properties = eq_properties.with_constants(constants); @@ -287,12 +272,10 @@ impl FilterExec { output_partitioning.project(&projection_mapping, &eq_properties); eq_properties = eq_properties.project(&projection_mapping, out_schema); } - Ok(PlanProperties::new( eq_properties, output_partitioning, - input.pipeline_behavior(), - input.boundedness(), + input.execution_mode(), )) } } @@ -431,6 +414,7 @@ fn collect_new_statistics( null_count: input_column_stats[idx].null_count.to_inexact(), max_value, min_value, + sum_value: Precision::Absent, distinct_count: distinct_count.to_inexact(), } }, @@ -1149,6 +1133,7 @@ mod tests { null_count: Precision::Absent, min_value: Precision::Inexact(ScalarValue::Int32(Some(5))), max_value: Precision::Inexact(ScalarValue::Int32(Some(10))), + sum_value: Precision::Absent, distinct_count: Precision::Absent, }], }; diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 69300fce7745..9ecd1a6e7f07 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -19,16 +19,16 @@ //! and producing batches in parallel for the right partitions use super::utils::{ - adjust_right_output_partitioning, reorder_output_after_swap, BatchSplitter, - BatchTransformer, BuildProbeJoinMetrics, NoopBatchTransformer, OnceAsync, OnceFut, + adjust_right_output_partitioning, BatchSplitter, BatchTransformer, + BuildProbeJoinMetrics, NoopBatchTransformer, OnceAsync, OnceFut, StatefulStreamResult, }; use crate::coalesce_partitions::CoalescePartitionsExec; -use crate::execution_plan::{boundedness_from_children, EmissionType}; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::{ - handle_state, ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, - ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, + execution_mode_from_children, handle_state, ColumnStatistics, DisplayAs, + DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan, + ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use arrow::compute::concat_batches; @@ -161,25 +161,14 @@ impl CrossJoinExec { left.schema().fields.len(), ); - PlanProperties::new( - eq_properties, - output_partitioning, - EmissionType::Final, - boundedness_from_children([left, right]), - ) - } + // Determine the execution mode: + let mut mode = execution_mode_from_children([left, right]); + if mode.is_unbounded() { + // If any of the inputs is unbounded, cross join breaks the pipeline. + mode = ExecutionMode::PipelineBreaking; + } - /// Returns a new `ExecutionPlan` that computes the same join as this one, - /// with the left and right inputs swapped using the specified - /// `partition_mode`. - pub fn swap_inputs(&self) -> Result> { - let new_join = - CrossJoinExec::new(Arc::clone(&self.right), Arc::clone(&self.left)); - reorder_output_after_swap( - Arc::new(new_join), - &self.left.schema(), - &self.right.schema(), - ) + PlanProperties::new(eq_properties, output_partitioning, mode) } } @@ -365,12 +354,36 @@ fn stats_cartesian_product( distinct_count: s.distinct_count, min_value: s.min_value, max_value: s.max_value, + sum_value: s + .sum_value + .get_value() + // Cast the row count into the same type as any existing sum value + .and_then(|v| { + Precision::::from(right_row_count) + .cast_to(&v.data_type()) + .ok() + }) + .map(|row_count| s.sum_value.multiply(&row_count)) + .unwrap_or(Precision::Absent), }) - .chain(right_col_stats.into_iter().map(|s| ColumnStatistics { - null_count: s.null_count.multiply(&left_row_count), - distinct_count: s.distinct_count, - min_value: s.min_value, - max_value: s.max_value, + .chain(right_col_stats.into_iter().map(|s| { + ColumnStatistics { + null_count: s.null_count.multiply(&left_row_count), + distinct_count: s.distinct_count, + min_value: s.min_value, + max_value: s.max_value, + sum_value: s + .sum_value + .get_value() + // Cast the row count into the same type as any existing sum value + .and_then(|v| { + Precision::::from(left_row_count) + .cast_to(&v.data_type()) + .ok() + }) + .map(|row_count| s.sum_value.multiply(&row_count)) + .unwrap_or(Precision::Absent), + } })) .collect(); @@ -604,12 +617,14 @@ mod tests { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(0), }, ColumnStatistics { distinct_count: Precision::Exact(1), max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Exact(3), }, ], @@ -622,6 +637,7 @@ mod tests { distinct_count: Precision::Exact(3), max_value: Precision::Exact(ScalarValue::Int64(Some(12))), min_value: Precision::Exact(ScalarValue::Int64(Some(0))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(20))), null_count: Precision::Exact(2), }], }; @@ -636,18 +652,25 @@ mod tests { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some( + 42 * right_row_count as i64, + ))), null_count: Precision::Exact(0), }, ColumnStatistics { distinct_count: Precision::Exact(1), max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Exact(3 * right_row_count), }, ColumnStatistics { distinct_count: Precision::Exact(3), max_value: Precision::Exact(ScalarValue::Int64(Some(12))), min_value: Precision::Exact(ScalarValue::Int64(Some(0))), + sum_value: Precision::Exact(ScalarValue::Int64(Some( + 20 * left_row_count as i64, + ))), null_count: Precision::Exact(2 * left_row_count), }, ], @@ -668,12 +691,14 @@ mod tests { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(0), }, ColumnStatistics { distinct_count: Precision::Exact(1), max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Exact(3), }, ], @@ -686,6 +711,7 @@ mod tests { distinct_count: Precision::Exact(3), max_value: Precision::Exact(ScalarValue::Int64(Some(12))), min_value: Precision::Exact(ScalarValue::Int64(Some(0))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(20))), null_count: Precision::Exact(2), }], }; @@ -700,18 +726,23 @@ mod tests { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Absent, // we don't know the row count on the right null_count: Precision::Absent, // we don't know the row count on the right }, ColumnStatistics { distinct_count: Precision::Exact(1), max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Absent, // we don't know the row count on the right }, ColumnStatistics { distinct_count: Precision::Exact(3), max_value: Precision::Exact(ScalarValue::Int64(Some(12))), min_value: Precision::Exact(ScalarValue::Int64(Some(0))), + sum_value: Precision::Exact(ScalarValue::Int64(Some( + 20 * left_row_count as i64, + ))), null_count: Precision::Exact(2 * left_row_count), }, ], diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 371949a32598..fca806c95b71 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -29,8 +29,6 @@ use crate::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder}; use crate::{ ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics, }; -// compatibility -pub use super::join_filter::JoinFilter; use arrow::array::{ downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array, @@ -56,7 +54,6 @@ use datafusion_physical_expr::{ LexOrdering, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, }; -use crate::projection::ProjectionExec; use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; use hashbrown::raw::RawTable; @@ -552,6 +549,66 @@ pub struct ColumnIndex { pub side: JoinSide, } +/// Filter applied before join output. Fields are crate-public to allow +/// downstream implementations to experiment with custom joins. +#[derive(Debug, Clone)] +pub struct JoinFilter { + /// Filter expression + pub(crate) expression: Arc, + /// Column indices required to construct intermediate batch for filtering + pub(crate) column_indices: Vec, + /// Physical schema of intermediate batch + pub(crate) schema: Schema, +} + +impl JoinFilter { + /// Creates new JoinFilter + pub fn new( + expression: Arc, + column_indices: Vec, + schema: Schema, + ) -> JoinFilter { + JoinFilter { + expression, + column_indices, + schema, + } + } + + /// Helper for building ColumnIndex vector from left and right indices + pub fn build_column_indices( + left_indices: Vec, + right_indices: Vec, + ) -> Vec { + left_indices + .into_iter() + .map(|i| ColumnIndex { + index: i, + side: JoinSide::Left, + }) + .chain(right_indices.into_iter().map(|i| ColumnIndex { + index: i, + side: JoinSide::Right, + })) + .collect() + } + + /// Filter expression + pub fn expression(&self) -> &Arc { + &self.expression + } + + /// Column indices for intermediate batch creation + pub fn column_indices(&self) -> &[ColumnIndex] { + &self.column_indices + } + + /// Intermediate batch schema + pub fn schema(&self) -> &Schema { + &self.schema + } +} + /// Returns the output field given the input field. Outer joins may /// insert nulls even if the input was not null /// @@ -1588,7 +1645,7 @@ macro_rules! handle_state { /// Represents the result of a stateful operation. /// -/// This enumeration indicates whether the state produced a result that is +/// This enumueration indicates whether the state produced a result that is /// ready for use (`Ready`) or if the operation requires continuation (`Continue`). /// /// Variants: @@ -1731,50 +1788,6 @@ impl BatchTransformer for BatchSplitter { } } -/// When the order of the join inputs are changed, the output order of columns -/// must remain the same. -/// -/// Joins output columns from their left input followed by their right input. -/// Thus if the inputs are reordered, the output columns must be reordered to -/// match the original order. -pub(crate) fn reorder_output_after_swap( - plan: Arc, - left_schema: &Schema, - right_schema: &Schema, -) -> Result> { - let proj = ProjectionExec::try_new( - swap_reverting_projection(left_schema, right_schema), - plan, - )?; - Ok(Arc::new(proj)) -} - -/// When the order of the join is changed, the output order of columns must -/// remain the same. -/// -/// Returns the expressions that will allow to swap back the values from the -/// original left as the first columns and those on the right next. -fn swap_reverting_projection( - left_schema: &Schema, - right_schema: &Schema, -) -> Vec<(Arc, String)> { - let right_cols = right_schema.fields().iter().enumerate().map(|(i, f)| { - ( - Arc::new(Column::new(f.name(), i)) as Arc, - f.name().to_owned(), - ) - }); - let right_len = right_cols.len(); - let left_cols = left_schema.fields().iter().enumerate().map(|(i, f)| { - ( - Arc::new(Column::new(f.name(), right_len + i)) as Arc, - f.name().to_owned(), - ) - }); - - left_cols.chain(right_cols).collect() -} - #[cfg(test)] mod tests { use std::pin::Pin; @@ -1983,6 +1996,7 @@ mod tests { distinct_count, min_value: min.map(ScalarValue::from), max_value: max.map(ScalarValue::from), + sum_value: Absent, null_count, } } @@ -2741,39 +2755,4 @@ mod tests { assert!(splitter.next().is_none()); assert_split_batches(batches, batch_size, num_rows); } - - #[tokio::test] - async fn test_swap_reverting_projection() { - let left_schema = Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Int32, false), - ]); - - let right_schema = Schema::new(vec![Field::new("c", DataType::Int32, false)]); - - let proj = swap_reverting_projection(&left_schema, &right_schema); - - assert_eq!(proj.len(), 3); - - let (col, name) = &proj[0]; - assert_eq!(name, "a"); - assert_col_expr(col, "a", 1); - - let (col, name) = &proj[1]; - assert_eq!(name, "b"); - assert_col_expr(col, "b", 2); - - let (col, name) = &proj[2]; - assert_eq!(name, "c"); - assert_col_expr(col, "c", 0); - } - - fn assert_col_expr(expr: &Arc, name: &str, index: usize) { - let col = expr - .as_any() - .downcast_ref::() - .expect("Projection items should be Column expression"); - assert_eq!(col.name(), name); - assert_eq!(col.index(), index); - } } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index e37a6b0dfb85..b498ef9af6f6 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -388,18 +388,21 @@ mod tests { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(0), }, ColumnStatistics { distinct_count: Precision::Exact(1), max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Exact(3), }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))), min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))), + sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))), null_count: Precision::Absent, }, ], @@ -432,12 +435,14 @@ mod tests { distinct_count: Precision::Exact(1), max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Exact(3), }, ColumnStatistics { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(0), }, ], @@ -466,12 +471,14 @@ mod tests { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))), min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))), + sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))), null_count: Precision::Absent, }, ColumnStatistics { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(0), }, ], diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index cfa919425c54..22e5baf95938 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -27,12 +27,12 @@ use std::task::{Context, Poll}; use std::{any::Any, sync::Arc}; use super::{ + execution_mode_from_children, metrics::{ExecutionPlanMetricsSet, MetricsSet}, ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use crate::execution_plan::{boundedness_from_children, emission_type_from_children}; use crate::metrics::BaselineMetrics; use crate::stream::ObservedStream; @@ -135,11 +135,14 @@ impl UnionExec { .map(|plan| plan.output_partitioning().partition_count()) .sum(); let output_partitioning = Partitioning::UnknownPartitioning(num_partitions); + + // Determine execution mode: + let mode = execution_mode_from_children(inputs.iter()); + Ok(PlanProperties::new( eq_properties, output_partitioning, - emission_type_from_children(inputs), - boundedness_from_children(inputs), + mode, )) } } @@ -332,12 +335,10 @@ impl InterleaveExec { let eq_properties = EquivalenceProperties::new(schema); // Get output partitioning: let output_partitioning = inputs[0].output_partitioning().clone(); - PlanProperties::new( - eq_properties, - output_partitioning, - emission_type_from_children(inputs), - boundedness_from_children(inputs), - ) + // Determine execution mode: + let mode = execution_mode_from_children(inputs.iter()); + + PlanProperties::new(eq_properties, output_partitioning, mode) } } @@ -571,16 +572,14 @@ impl Stream for CombinedRecordBatchStream { } } -fn col_stats_union( - mut left: ColumnStatistics, - right: ColumnStatistics, -) -> ColumnStatistics { - left.distinct_count = Precision::Absent; - left.min_value = left.min_value.min(&right.min_value); - left.max_value = left.max_value.max(&right.max_value); - left.null_count = left.null_count.add(&right.null_count); - - left +fn col_stats_union(left: ColumnStatistics, right: ColumnStatistics) -> ColumnStatistics { + ColumnStatistics { + null_count: left.null_count.add(&right.null_count), + max_value: left.max_value.max(&right.max_value), + min_value: left.min_value.min(&right.min_value), + sum_value: left.sum_value.add(&right.sum_value), + distinct_count: Precision::Absent, + } } fn stats_union(mut left: Statistics, right: Statistics) -> Statistics { @@ -670,18 +669,21 @@ mod tests { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(0), }, ColumnStatistics { distinct_count: Precision::Exact(1), max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Exact(3), }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))), min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))), + sum_value: Precision::Exact(ScalarValue::Float32(Some(42.0))), null_count: Precision::Absent, }, ], @@ -695,18 +697,21 @@ mod tests { distinct_count: Precision::Exact(3), max_value: Precision::Exact(ScalarValue::Int64(Some(34))), min_value: Precision::Exact(ScalarValue::Int64(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(1), }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::from("c")), min_value: Precision::Exact(ScalarValue::from("b")), + sum_value: Precision::Absent, null_count: Precision::Absent, }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, null_count: Precision::Absent, }, ], @@ -721,18 +726,21 @@ mod tests { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::Int64(Some(34))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(84))), null_count: Precision::Exact(1), }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Absent, }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, null_count: Precision::Absent, }, ], @@ -843,9 +851,9 @@ mod tests { ) { // Check whether orderings are same. let lhs_orderings = lhs.oeq_class(); - let rhs_orderings = rhs.oeq_class(); + let rhs_orderings = &rhs.oeq_class.orderings; assert_eq!(lhs_orderings.len(), rhs_orderings.len(), "{}", err_msg); - for rhs_ordering in rhs_orderings.iter() { + for rhs_ordering in rhs_orderings { assert!(lhs_orderings.contains(rhs_ordering), "{}", err_msg); } } diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 5089b1e626d4..152e48ed07bf 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -20,8 +20,10 @@ use std::any::Any; use std::sync::Arc; -use super::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics}; -use crate::execution_plan::{Boundedness, EmissionType}; +use super::{ + common, DisplayAs, ExecutionMode, PlanProperties, SendableRecordBatchStream, + Statistics, +}; use crate::{ memory::MemoryStream, ColumnarValue, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, @@ -131,11 +133,12 @@ impl ValuesExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { + let eq_properties = EquivalenceProperties::new(schema); + PlanProperties::new( - EquivalenceProperties::new(schema), + eq_properties, Partitioning::UnknownPartitioning(1), - EmissionType::Incremental, - Boundedness::Bounded, + ExecutionMode::Bounded, ) } } @@ -291,6 +294,7 @@ mod tests { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, },], } ); diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 6a7dc1604b0a..6a68cd22d9fb 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -421,11 +421,10 @@ message CsvOptions { string timestamp_tz_format = 10; // Optional timestamp with timezone format string time_format = 11; // Optional time format string null_value = 12; // Optional representation of null value - string null_regex = 13; // Optional representation of null loading regex - bytes comment = 14; // Optional comment character as a byte - bytes double_quote = 15; // Indicates if quotes are doubled - bytes newlines_in_values = 16; // Indicates if newlines are supported in values - bytes terminator = 17; // Optional terminator character as a byte + bytes comment = 13; // Optional comment character as a byte + bytes double_quote = 14; // Indicates if quotes are doubled + bytes newlines_in_values = 15; // Indicates if newlines are supported in values + bytes terminator = 16; // Optional terminator character as a byte } // Options controlling CSV format @@ -497,7 +496,6 @@ message ParquetOptions { bool bloom_filter_on_write = 27; // default = false bool schema_force_view_types = 28; // default = false bool binary_as_string = 29; // default = false - bool skip_arrow_metadata = 30; // default = false oneof metadata_size_hint_opt { uint64 metadata_size_hint = 4; @@ -570,6 +568,7 @@ message Statistics { message ColumnStats { Precision min_value = 1; Precision max_value = 2; + Precision sum_value = 5; Precision null_count = 3; Precision distinct_count = 4; } diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index ca8306275b11..4acec1ef54b3 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -694,6 +694,11 @@ impl From<&protobuf::ColumnStats> for ColumnStatistics { } else { Precision::Absent }, + sum_value: if let Some(sum) = &cs.sum_value { + sum.clone().into() + } else { + Precision::Absent + }, distinct_count: if let Some(dc) = &cs.distinct_count { dc.clone().into() } else { @@ -882,8 +887,6 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions { .then(|| proto_opts.time_format.clone()), null_value: (!proto_opts.null_value.is_empty()) .then(|| proto_opts.null_value.clone()), - null_regex: (!proto_opts.null_regex.is_empty()) - .then(|| proto_opts.null_regex.clone()), comment: proto_opts.comment.first().copied(), }) } @@ -962,7 +965,6 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize, schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, - skip_arrow_metadata: value.skip_arrow_metadata, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index e9f9de09d4d1..4ec3703e1da5 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -985,6 +985,9 @@ impl serde::Serialize for ColumnStats { if self.max_value.is_some() { len += 1; } + if self.sum_value.is_some() { + len += 1; + } if self.null_count.is_some() { len += 1; } @@ -998,6 +1001,9 @@ impl serde::Serialize for ColumnStats { if let Some(v) = self.max_value.as_ref() { struct_ser.serialize_field("maxValue", v)?; } + if let Some(v) = self.sum_value.as_ref() { + struct_ser.serialize_field("sumValue", v)?; + } if let Some(v) = self.null_count.as_ref() { struct_ser.serialize_field("nullCount", v)?; } @@ -1018,6 +1024,8 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { "minValue", "max_value", "maxValue", + "sum_value", + "sumValue", "null_count", "nullCount", "distinct_count", @@ -1028,6 +1036,7 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { enum GeneratedField { MinValue, MaxValue, + SumValue, NullCount, DistinctCount, } @@ -1053,6 +1062,7 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { match value { "minValue" | "min_value" => Ok(GeneratedField::MinValue), "maxValue" | "max_value" => Ok(GeneratedField::MaxValue), + "sumValue" | "sum_value" => Ok(GeneratedField::SumValue), "nullCount" | "null_count" => Ok(GeneratedField::NullCount), "distinctCount" | "distinct_count" => Ok(GeneratedField::DistinctCount), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), @@ -1076,6 +1086,7 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { { let mut min_value__ = None; let mut max_value__ = None; + let mut sum_value__ = None; let mut null_count__ = None; let mut distinct_count__ = None; while let Some(k) = map_.next_key()? { @@ -1092,6 +1103,12 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { } max_value__ = map_.next_value()?; } + GeneratedField::SumValue => { + if sum_value__.is_some() { + return Err(serde::de::Error::duplicate_field("sumValue")); + } + sum_value__ = map_.next_value()?; + } GeneratedField::NullCount => { if null_count__.is_some() { return Err(serde::de::Error::duplicate_field("nullCount")); @@ -1109,6 +1126,7 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { Ok(ColumnStats { min_value: min_value__, max_value: max_value__, + sum_value: sum_value__, null_count: null_count__, distinct_count: distinct_count__, }) @@ -1533,9 +1551,6 @@ impl serde::Serialize for CsvOptions { if !self.null_value.is_empty() { len += 1; } - if !self.null_regex.is_empty() { - len += 1; - } if !self.comment.is_empty() { len += 1; } @@ -1597,9 +1612,6 @@ impl serde::Serialize for CsvOptions { if !self.null_value.is_empty() { struct_ser.serialize_field("nullValue", &self.null_value)?; } - if !self.null_regex.is_empty() { - struct_ser.serialize_field("nullRegex", &self.null_regex)?; - } if !self.comment.is_empty() { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] @@ -1650,8 +1662,6 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "timeFormat", "null_value", "nullValue", - "null_regex", - "nullRegex", "comment", "double_quote", "doubleQuote", @@ -1674,7 +1684,6 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { TimestampTzFormat, TimeFormat, NullValue, - NullRegex, Comment, DoubleQuote, NewlinesInValues, @@ -1712,7 +1721,6 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "timestampTzFormat" | "timestamp_tz_format" => Ok(GeneratedField::TimestampTzFormat), "timeFormat" | "time_format" => Ok(GeneratedField::TimeFormat), "nullValue" | "null_value" => Ok(GeneratedField::NullValue), - "nullRegex" | "null_regex" => Ok(GeneratedField::NullRegex), "comment" => Ok(GeneratedField::Comment), "doubleQuote" | "double_quote" => Ok(GeneratedField::DoubleQuote), "newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues), @@ -1748,7 +1756,6 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { let mut timestamp_tz_format__ = None; let mut time_format__ = None; let mut null_value__ = None; - let mut null_regex__ = None; let mut comment__ = None; let mut double_quote__ = None; let mut newlines_in_values__ = None; @@ -1837,12 +1844,6 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { } null_value__ = Some(map_.next_value()?); } - GeneratedField::NullRegex => { - if null_regex__.is_some() { - return Err(serde::de::Error::duplicate_field("nullRegex")); - } - null_regex__ = Some(map_.next_value()?); - } GeneratedField::Comment => { if comment__.is_some() { return Err(serde::de::Error::duplicate_field("comment")); @@ -1890,7 +1891,6 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { timestamp_tz_format: timestamp_tz_format__.unwrap_or_default(), time_format: time_format__.unwrap_or_default(), null_value: null_value__.unwrap_or_default(), - null_regex: null_regex__.unwrap_or_default(), comment: comment__.unwrap_or_default(), double_quote: double_quote__.unwrap_or_default(), newlines_in_values: newlines_in_values__.unwrap_or_default(), @@ -4940,9 +4940,6 @@ impl serde::Serialize for ParquetOptions { if self.binary_as_string { len += 1; } - if self.skip_arrow_metadata { - len += 1; - } if self.dictionary_page_size_limit != 0 { len += 1; } @@ -5036,9 +5033,6 @@ impl serde::Serialize for ParquetOptions { if self.binary_as_string { struct_ser.serialize_field("binaryAsString", &self.binary_as_string)?; } - if self.skip_arrow_metadata { - struct_ser.serialize_field("skipArrowMetadata", &self.skip_arrow_metadata)?; - } if self.dictionary_page_size_limit != 0 { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] @@ -5167,8 +5161,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "schemaForceViewTypes", "binary_as_string", "binaryAsString", - "skip_arrow_metadata", - "skipArrowMetadata", "dictionary_page_size_limit", "dictionaryPageSizeLimit", "data_page_row_count_limit", @@ -5212,7 +5204,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { BloomFilterOnWrite, SchemaForceViewTypes, BinaryAsString, - SkipArrowMetadata, DictionaryPageSizeLimit, DataPageRowCountLimit, MaxRowGroupSize, @@ -5262,7 +5253,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterOnWrite" | "bloom_filter_on_write" => Ok(GeneratedField::BloomFilterOnWrite), "schemaForceViewTypes" | "schema_force_view_types" => Ok(GeneratedField::SchemaForceViewTypes), "binaryAsString" | "binary_as_string" => Ok(GeneratedField::BinaryAsString), - "skipArrowMetadata" | "skip_arrow_metadata" => Ok(GeneratedField::SkipArrowMetadata), "dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit), "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), @@ -5310,7 +5300,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut bloom_filter_on_write__ = None; let mut schema_force_view_types__ = None; let mut binary_as_string__ = None; - let mut skip_arrow_metadata__ = None; let mut dictionary_page_size_limit__ = None; let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; @@ -5424,12 +5413,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } binary_as_string__ = Some(map_.next_value()?); } - GeneratedField::SkipArrowMetadata => { - if skip_arrow_metadata__.is_some() { - return Err(serde::de::Error::duplicate_field("skipArrowMetadata")); - } - skip_arrow_metadata__ = Some(map_.next_value()?); - } GeneratedField::DictionaryPageSizeLimit => { if dictionary_page_size_limit__.is_some() { return Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit")); @@ -5532,7 +5515,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { bloom_filter_on_write: bloom_filter_on_write__.unwrap_or_default(), schema_force_view_types: schema_force_view_types__.unwrap_or_default(), binary_as_string: binary_as_string__.unwrap_or_default(), - skip_arrow_metadata: skip_arrow_metadata__.unwrap_or_default(), dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(), data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 3263c1c755af..6cc7652265c0 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -593,20 +593,17 @@ pub struct CsvOptions { /// Optional representation of null value #[prost(string, tag = "12")] pub null_value: ::prost::alloc::string::String, - /// Optional representation of null loading regex - #[prost(string, tag = "13")] - pub null_regex: ::prost::alloc::string::String, /// Optional comment character as a byte - #[prost(bytes = "vec", tag = "14")] + #[prost(bytes = "vec", tag = "13")] pub comment: ::prost::alloc::vec::Vec, /// Indicates if quotes are doubled - #[prost(bytes = "vec", tag = "15")] + #[prost(bytes = "vec", tag = "14")] pub double_quote: ::prost::alloc::vec::Vec, /// Indicates if newlines are supported in values - #[prost(bytes = "vec", tag = "16")] + #[prost(bytes = "vec", tag = "15")] pub newlines_in_values: ::prost::alloc::vec::Vec, /// Optional terminator character as a byte - #[prost(bytes = "vec", tag = "17")] + #[prost(bytes = "vec", tag = "16")] pub terminator: ::prost::alloc::vec::Vec, } /// Options controlling CSV format @@ -763,9 +760,6 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "29")] pub binary_as_string: bool, - /// default = false - #[prost(bool, tag = "30")] - pub skip_arrow_metadata: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] @@ -873,6 +867,8 @@ pub struct ColumnStats { pub min_value: ::core::option::Option, #[prost(message, optional, tag = "2")] pub max_value: ::core::option::Option, + #[prost(message, optional, tag = "5")] + pub sum_value: ::core::option::Option, #[prost(message, optional, tag = "3")] pub null_count: ::core::option::Option, #[prost(message, optional, tag = "4")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 79faaba864f3..8fa1c0dd2ed1 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -748,6 +748,7 @@ impl From<&ColumnStatistics> for protobuf::ColumnStats { protobuf::ColumnStats { min_value: Some(protobuf::Precision::from(&s.min_value)), max_value: Some(protobuf::Precision::from(&s.max_value)), + sum_value: Some(protobuf::Precision::from(&s.sum_value)), null_count: Some(protobuf::Precision::from(&s.null_count)), distinct_count: Some(protobuf::Precision::from(&s.distinct_count)), } @@ -833,7 +834,6 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64, schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, - skip_arrow_metadata: value.skip_arrow_metadata, }) } } @@ -929,7 +929,6 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(), time_format: opts.time_format.clone().unwrap_or_default(), null_value: opts.null_value.clone().unwrap_or_default(), - null_regex: opts.null_regex.clone().unwrap_or_default(), comment: opts.comment.map_or_else(Vec::new, |h| vec![h]), }) } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 3263c1c755af..6cc7652265c0 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -593,20 +593,17 @@ pub struct CsvOptions { /// Optional representation of null value #[prost(string, tag = "12")] pub null_value: ::prost::alloc::string::String, - /// Optional representation of null loading regex - #[prost(string, tag = "13")] - pub null_regex: ::prost::alloc::string::String, /// Optional comment character as a byte - #[prost(bytes = "vec", tag = "14")] + #[prost(bytes = "vec", tag = "13")] pub comment: ::prost::alloc::vec::Vec, /// Indicates if quotes are doubled - #[prost(bytes = "vec", tag = "15")] + #[prost(bytes = "vec", tag = "14")] pub double_quote: ::prost::alloc::vec::Vec, /// Indicates if newlines are supported in values - #[prost(bytes = "vec", tag = "16")] + #[prost(bytes = "vec", tag = "15")] pub newlines_in_values: ::prost::alloc::vec::Vec, /// Optional terminator character as a byte - #[prost(bytes = "vec", tag = "17")] + #[prost(bytes = "vec", tag = "16")] pub terminator: ::prost::alloc::vec::Vec, } /// Options controlling CSV format @@ -763,9 +760,6 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "29")] pub binary_as_string: bool, - /// default = false - #[prost(bool, tag = "30")] - pub skip_arrow_metadata: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] @@ -873,6 +867,8 @@ pub struct ColumnStats { pub min_value: ::core::option::Option, #[prost(message, optional, tag = "2")] pub max_value: ::core::option::Option, + #[prost(message, optional, tag = "5")] + pub sum_value: ::core::option::Option, #[prost(message, optional, tag = "3")] pub null_count: ::core::option::Option, #[prost(message, optional, tag = "4")] From d1d09969986067d9906bb07ff4883f28d475c71e Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 Jan 2025 12:37:51 +0000 Subject: [PATCH 2/8] Add sum statistic --- .../proto/datafusion_common.proto | 10 +++--- datafusion/proto-common/src/from_proto/mod.rs | 3 ++ .../proto-common/src/generated/pbjson.rs | 36 +++++++++++++++++++ .../proto-common/src/generated/prost.rs | 14 +++++--- datafusion/proto-common/src/to_proto/mod.rs | 2 ++ .../src/generated/datafusion_proto_common.rs | 14 +++++--- 6 files changed, 67 insertions(+), 12 deletions(-) diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 6a68cd22d9fb..1c2807f390bf 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -421,10 +421,11 @@ message CsvOptions { string timestamp_tz_format = 10; // Optional timestamp with timezone format string time_format = 11; // Optional time format string null_value = 12; // Optional representation of null value - bytes comment = 13; // Optional comment character as a byte - bytes double_quote = 14; // Indicates if quotes are doubled - bytes newlines_in_values = 15; // Indicates if newlines are supported in values - bytes terminator = 16; // Optional terminator character as a byte + string null_regex = 13; // Optional representation of null loading regex + bytes comment = 14; // Optional comment character as a byte + bytes double_quote = 15; // Indicates if quotes are doubled + bytes newlines_in_values = 16; // Indicates if newlines are supported in values + bytes terminator = 17; // Optional terminator character as a byte } // Options controlling CSV format @@ -496,6 +497,7 @@ message ParquetOptions { bool bloom_filter_on_write = 27; // default = false bool schema_force_view_types = 28; // default = false bool binary_as_string = 29; // default = false + bool skip_arrow_metadata = 30; // default = false oneof metadata_size_hint_opt { uint64 metadata_size_hint = 4; diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 4acec1ef54b3..6f1e334e5aca 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -887,6 +887,8 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions { .then(|| proto_opts.time_format.clone()), null_value: (!proto_opts.null_value.is_empty()) .then(|| proto_opts.null_value.clone()), + null_regex: (!proto_opts.null_regex.is_empty()) + .then(|| proto_opts.null_regex.clone()), comment: proto_opts.comment.first().copied(), }) } @@ -965,6 +967,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize, schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, + skip_arrow_metadata: value.skip_arrow_metadata, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 4ec3703e1da5..40687de098c1 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1551,6 +1551,9 @@ impl serde::Serialize for CsvOptions { if !self.null_value.is_empty() { len += 1; } + if !self.null_regex.is_empty() { + len += 1; + } if !self.comment.is_empty() { len += 1; } @@ -1612,6 +1615,9 @@ impl serde::Serialize for CsvOptions { if !self.null_value.is_empty() { struct_ser.serialize_field("nullValue", &self.null_value)?; } + if !self.null_regex.is_empty() { + struct_ser.serialize_field("nullRegex", &self.null_regex)?; + } if !self.comment.is_empty() { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] @@ -1662,6 +1668,8 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "timeFormat", "null_value", "nullValue", + "null_regex", + "nullRegex", "comment", "double_quote", "doubleQuote", @@ -1684,6 +1692,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { TimestampTzFormat, TimeFormat, NullValue, + NullRegex, Comment, DoubleQuote, NewlinesInValues, @@ -1721,6 +1730,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "timestampTzFormat" | "timestamp_tz_format" => Ok(GeneratedField::TimestampTzFormat), "timeFormat" | "time_format" => Ok(GeneratedField::TimeFormat), "nullValue" | "null_value" => Ok(GeneratedField::NullValue), + "nullRegex" | "null_regex" => Ok(GeneratedField::NullRegex), "comment" => Ok(GeneratedField::Comment), "doubleQuote" | "double_quote" => Ok(GeneratedField::DoubleQuote), "newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues), @@ -1756,6 +1766,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { let mut timestamp_tz_format__ = None; let mut time_format__ = None; let mut null_value__ = None; + let mut null_regex__ = None; let mut comment__ = None; let mut double_quote__ = None; let mut newlines_in_values__ = None; @@ -1844,6 +1855,12 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { } null_value__ = Some(map_.next_value()?); } + GeneratedField::NullRegex => { + if null_regex__.is_some() { + return Err(serde::de::Error::duplicate_field("nullRegex")); + } + null_regex__ = Some(map_.next_value()?); + } GeneratedField::Comment => { if comment__.is_some() { return Err(serde::de::Error::duplicate_field("comment")); @@ -1891,6 +1908,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { timestamp_tz_format: timestamp_tz_format__.unwrap_or_default(), time_format: time_format__.unwrap_or_default(), null_value: null_value__.unwrap_or_default(), + null_regex: null_regex__.unwrap_or_default(), comment: comment__.unwrap_or_default(), double_quote: double_quote__.unwrap_or_default(), newlines_in_values: newlines_in_values__.unwrap_or_default(), @@ -4940,6 +4958,9 @@ impl serde::Serialize for ParquetOptions { if self.binary_as_string { len += 1; } + if self.skip_arrow_metadata { + len += 1; + } if self.dictionary_page_size_limit != 0 { len += 1; } @@ -5033,6 +5054,9 @@ impl serde::Serialize for ParquetOptions { if self.binary_as_string { struct_ser.serialize_field("binaryAsString", &self.binary_as_string)?; } + if self.skip_arrow_metadata { + struct_ser.serialize_field("skipArrowMetadata", &self.skip_arrow_metadata)?; + } if self.dictionary_page_size_limit != 0 { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] @@ -5161,6 +5185,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "schemaForceViewTypes", "binary_as_string", "binaryAsString", + "skip_arrow_metadata", + "skipArrowMetadata", "dictionary_page_size_limit", "dictionaryPageSizeLimit", "data_page_row_count_limit", @@ -5204,6 +5230,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { BloomFilterOnWrite, SchemaForceViewTypes, BinaryAsString, + SkipArrowMetadata, DictionaryPageSizeLimit, DataPageRowCountLimit, MaxRowGroupSize, @@ -5253,6 +5280,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterOnWrite" | "bloom_filter_on_write" => Ok(GeneratedField::BloomFilterOnWrite), "schemaForceViewTypes" | "schema_force_view_types" => Ok(GeneratedField::SchemaForceViewTypes), "binaryAsString" | "binary_as_string" => Ok(GeneratedField::BinaryAsString), + "skipArrowMetadata" | "skip_arrow_metadata" => Ok(GeneratedField::SkipArrowMetadata), "dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit), "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), @@ -5300,6 +5328,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut bloom_filter_on_write__ = None; let mut schema_force_view_types__ = None; let mut binary_as_string__ = None; + let mut skip_arrow_metadata__ = None; let mut dictionary_page_size_limit__ = None; let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; @@ -5413,6 +5442,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } binary_as_string__ = Some(map_.next_value()?); } + GeneratedField::SkipArrowMetadata => { + if skip_arrow_metadata__.is_some() { + return Err(serde::de::Error::duplicate_field("skipArrowMetadata")); + } + skip_arrow_metadata__ = Some(map_.next_value()?); + } GeneratedField::DictionaryPageSizeLimit => { if dictionary_page_size_limit__.is_some() { return Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit")); @@ -5515,6 +5550,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { bloom_filter_on_write: bloom_filter_on_write__.unwrap_or_default(), schema_force_view_types: schema_force_view_types__.unwrap_or_default(), binary_as_string: binary_as_string__.unwrap_or_default(), + skip_arrow_metadata: skip_arrow_metadata__.unwrap_or_default(), dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(), data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 6cc7652265c0..9e4a1ecb6b09 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -593,17 +593,20 @@ pub struct CsvOptions { /// Optional representation of null value #[prost(string, tag = "12")] pub null_value: ::prost::alloc::string::String, + /// Optional representation of null loading regex + #[prost(string, tag = "13")] + pub null_regex: ::prost::alloc::string::String, /// Optional comment character as a byte - #[prost(bytes = "vec", tag = "13")] + #[prost(bytes = "vec", tag = "14")] pub comment: ::prost::alloc::vec::Vec, /// Indicates if quotes are doubled - #[prost(bytes = "vec", tag = "14")] + #[prost(bytes = "vec", tag = "15")] pub double_quote: ::prost::alloc::vec::Vec, /// Indicates if newlines are supported in values - #[prost(bytes = "vec", tag = "15")] + #[prost(bytes = "vec", tag = "16")] pub newlines_in_values: ::prost::alloc::vec::Vec, /// Optional terminator character as a byte - #[prost(bytes = "vec", tag = "16")] + #[prost(bytes = "vec", tag = "17")] pub terminator: ::prost::alloc::vec::Vec, } /// Options controlling CSV format @@ -760,6 +763,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "29")] pub binary_as_string: bool, + /// default = false + #[prost(bool, tag = "30")] + pub skip_arrow_metadata: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 8fa1c0dd2ed1..5dacc6c61f4f 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -834,6 +834,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64, schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, + skip_arrow_metadata: value.skip_arrow_metadata, }) } } @@ -929,6 +930,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(), time_format: opts.time_format.clone().unwrap_or_default(), null_value: opts.null_value.clone().unwrap_or_default(), + null_regex: opts.null_regex.clone().unwrap_or_default(), comment: opts.comment.map_or_else(Vec::new, |h| vec![h]), }) } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 6cc7652265c0..9e4a1ecb6b09 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -593,17 +593,20 @@ pub struct CsvOptions { /// Optional representation of null value #[prost(string, tag = "12")] pub null_value: ::prost::alloc::string::String, + /// Optional representation of null loading regex + #[prost(string, tag = "13")] + pub null_regex: ::prost::alloc::string::String, /// Optional comment character as a byte - #[prost(bytes = "vec", tag = "13")] + #[prost(bytes = "vec", tag = "14")] pub comment: ::prost::alloc::vec::Vec, /// Indicates if quotes are doubled - #[prost(bytes = "vec", tag = "14")] + #[prost(bytes = "vec", tag = "15")] pub double_quote: ::prost::alloc::vec::Vec, /// Indicates if newlines are supported in values - #[prost(bytes = "vec", tag = "15")] + #[prost(bytes = "vec", tag = "16")] pub newlines_in_values: ::prost::alloc::vec::Vec, /// Optional terminator character as a byte - #[prost(bytes = "vec", tag = "16")] + #[prost(bytes = "vec", tag = "17")] pub terminator: ::prost::alloc::vec::Vec, } /// Options controlling CSV format @@ -760,6 +763,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "29")] pub binary_as_string: bool, + /// default = false + #[prost(bool, tag = "30")] + pub skip_arrow_metadata: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] From 04b7cea1229c7b4a26b9697f0d69c3f7e4a6ce3a Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 Jan 2025 12:43:20 +0000 Subject: [PATCH 3/8] Add sum statistic --- datafusion/physical-plan/src/filter.rs | 31 +++- .../physical-plan/src/joins/cross_join.rs | 69 +++++---- datafusion/physical-plan/src/joins/utils.rs | 144 ++++++++++-------- datafusion/physical-plan/src/union.rs | 50 +++--- datafusion/physical-plan/src/values.rs | 8 +- 5 files changed, 171 insertions(+), 131 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index f15336f0edb1..5d8905f073eb 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -45,7 +45,8 @@ use datafusion_physical_expr::expressions::BinaryExpr; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ - analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, + analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, + ExprBoundaries, PhysicalExpr, }; use crate::execution_plan::CardinalityEffect; @@ -218,13 +219,23 @@ impl FilterExec { if binary.op() == &Operator::Eq { // Filter evaluates to single value for all partitions if input_eqs.is_expr_constant(binary.left()) { + let (expr, across_parts) = ( + binary.right(), + input_eqs.get_expr_constant_value(binary.right()), + ); res_constants.push( - ConstExpr::from(binary.right()).with_across_partitions(true), - ) + ConstExpr::new(Arc::clone(expr)) + .with_across_partitions(across_parts), + ); } else if input_eqs.is_expr_constant(binary.right()) { + let (expr, across_parts) = ( + binary.left(), + input_eqs.get_expr_constant_value(binary.left()), + ); res_constants.push( - ConstExpr::from(binary.left()).with_across_partitions(true), - ) + ConstExpr::new(Arc::clone(expr)) + .with_across_partitions(across_parts), + ); } } } @@ -252,8 +263,12 @@ impl FilterExec { .into_iter() .filter(|column| stats.column_statistics[column.index()].is_singleton()) .map(|column| { + let value = stats.column_statistics[column.index()] + .min_value + .get_value(); let expr = Arc::new(column) as _; - ConstExpr::new(expr).with_across_partitions(true) + ConstExpr::new(expr) + .with_across_partitions(AcrossPartitions::Uniform(value.cloned())) }); // This is for statistics eq_properties = eq_properties.with_constants(constants); @@ -272,10 +287,12 @@ impl FilterExec { output_partitioning.project(&projection_mapping, &eq_properties); eq_properties = eq_properties.project(&projection_mapping, out_schema); } + Ok(PlanProperties::new( eq_properties, output_partitioning, - input.execution_mode(), + input.pipeline_behavior(), + input.boundedness(), )) } } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 9ecd1a6e7f07..698758c27cde 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -19,16 +19,16 @@ //! and producing batches in parallel for the right partitions use super::utils::{ - adjust_right_output_partitioning, BatchSplitter, BatchTransformer, - BuildProbeJoinMetrics, NoopBatchTransformer, OnceAsync, OnceFut, + adjust_right_output_partitioning, reorder_output_after_swap, BatchSplitter, + BatchTransformer, BuildProbeJoinMetrics, NoopBatchTransformer, OnceAsync, OnceFut, StatefulStreamResult, }; use crate::coalesce_partitions::CoalescePartitionsExec; +use crate::execution_plan::{boundedness_from_children, EmissionType}; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::{ - execution_mode_from_children, handle_state, ColumnStatistics, DisplayAs, - DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan, - ExecutionPlanProperties, PlanProperties, RecordBatchStream, + handle_state, ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, + ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use arrow::compute::concat_batches; @@ -161,14 +161,25 @@ impl CrossJoinExec { left.schema().fields.len(), ); - // Determine the execution mode: - let mut mode = execution_mode_from_children([left, right]); - if mode.is_unbounded() { - // If any of the inputs is unbounded, cross join breaks the pipeline. - mode = ExecutionMode::PipelineBreaking; - } + PlanProperties::new( + eq_properties, + output_partitioning, + EmissionType::Final, + boundedness_from_children([left, right]), + ) + } - PlanProperties::new(eq_properties, output_partitioning, mode) + /// Returns a new `ExecutionPlan` that computes the same join as this one, + /// with the left and right inputs swapped using the specified + /// `partition_mode`. + pub fn swap_inputs(&self) -> Result> { + let new_join = + CrossJoinExec::new(Arc::clone(&self.right), Arc::clone(&self.left)); + reorder_output_after_swap( + Arc::new(new_join), + &self.left.schema(), + &self.right.schema(), + ) } } @@ -366,24 +377,22 @@ fn stats_cartesian_product( .map(|row_count| s.sum_value.multiply(&row_count)) .unwrap_or(Precision::Absent), }) - .chain(right_col_stats.into_iter().map(|s| { - ColumnStatistics { - null_count: s.null_count.multiply(&left_row_count), - distinct_count: s.distinct_count, - min_value: s.min_value, - max_value: s.max_value, - sum_value: s - .sum_value - .get_value() - // Cast the row count into the same type as any existing sum value - .and_then(|v| { - Precision::::from(left_row_count) - .cast_to(&v.data_type()) - .ok() - }) - .map(|row_count| s.sum_value.multiply(&row_count)) - .unwrap_or(Precision::Absent), - } + .chain(right_col_stats.into_iter().map(|s| ColumnStatistics { + null_count: s.null_count.multiply(&left_row_count), + distinct_count: s.distinct_count, + min_value: s.min_value, + max_value: s.max_value, + sum_value: s + .sum_value + .get_value() + // Cast the row count into the same type as any existing sum value + .and_then(|v| { + Precision::::from(left_row_count) + .cast_to(&v.data_type()) + .ok() + }) + .map(|row_count| s.sum_value.multiply(&row_count)) + .unwrap_or(Precision::Absent), })) .collect(); diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index fca806c95b71..c321866a91b8 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -29,6 +29,8 @@ use crate::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder}; use crate::{ ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics, }; +// compatibility +pub use super::join_filter::JoinFilter; use arrow::array::{ downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array, @@ -54,6 +56,7 @@ use datafusion_physical_expr::{ LexOrdering, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, }; +use crate::projection::ProjectionExec; use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; use hashbrown::raw::RawTable; @@ -549,66 +552,6 @@ pub struct ColumnIndex { pub side: JoinSide, } -/// Filter applied before join output. Fields are crate-public to allow -/// downstream implementations to experiment with custom joins. -#[derive(Debug, Clone)] -pub struct JoinFilter { - /// Filter expression - pub(crate) expression: Arc, - /// Column indices required to construct intermediate batch for filtering - pub(crate) column_indices: Vec, - /// Physical schema of intermediate batch - pub(crate) schema: Schema, -} - -impl JoinFilter { - /// Creates new JoinFilter - pub fn new( - expression: Arc, - column_indices: Vec, - schema: Schema, - ) -> JoinFilter { - JoinFilter { - expression, - column_indices, - schema, - } - } - - /// Helper for building ColumnIndex vector from left and right indices - pub fn build_column_indices( - left_indices: Vec, - right_indices: Vec, - ) -> Vec { - left_indices - .into_iter() - .map(|i| ColumnIndex { - index: i, - side: JoinSide::Left, - }) - .chain(right_indices.into_iter().map(|i| ColumnIndex { - index: i, - side: JoinSide::Right, - })) - .collect() - } - - /// Filter expression - pub fn expression(&self) -> &Arc { - &self.expression - } - - /// Column indices for intermediate batch creation - pub fn column_indices(&self) -> &[ColumnIndex] { - &self.column_indices - } - - /// Intermediate batch schema - pub fn schema(&self) -> &Schema { - &self.schema - } -} - /// Returns the output field given the input field. Outer joins may /// insert nulls even if the input was not null /// @@ -1645,7 +1588,7 @@ macro_rules! handle_state { /// Represents the result of a stateful operation. /// -/// This enumueration indicates whether the state produced a result that is +/// This enumeration indicates whether the state produced a result that is /// ready for use (`Ready`) or if the operation requires continuation (`Continue`). /// /// Variants: @@ -1788,6 +1731,50 @@ impl BatchTransformer for BatchSplitter { } } +/// When the order of the join inputs are changed, the output order of columns +/// must remain the same. +/// +/// Joins output columns from their left input followed by their right input. +/// Thus if the inputs are reordered, the output columns must be reordered to +/// match the original order. +pub(crate) fn reorder_output_after_swap( + plan: Arc, + left_schema: &Schema, + right_schema: &Schema, +) -> Result> { + let proj = ProjectionExec::try_new( + swap_reverting_projection(left_schema, right_schema), + plan, + )?; + Ok(Arc::new(proj)) +} + +/// When the order of the join is changed, the output order of columns must +/// remain the same. +/// +/// Returns the expressions that will allow to swap back the values from the +/// original left as the first columns and those on the right next. +fn swap_reverting_projection( + left_schema: &Schema, + right_schema: &Schema, +) -> Vec<(Arc, String)> { + let right_cols = right_schema.fields().iter().enumerate().map(|(i, f)| { + ( + Arc::new(Column::new(f.name(), i)) as Arc, + f.name().to_owned(), + ) + }); + let right_len = right_cols.len(); + let left_cols = left_schema.fields().iter().enumerate().map(|(i, f)| { + ( + Arc::new(Column::new(f.name(), right_len + i)) as Arc, + f.name().to_owned(), + ) + }); + + left_cols.chain(right_cols).collect() +} + #[cfg(test)] mod tests { use std::pin::Pin; @@ -2755,4 +2742,39 @@ mod tests { assert!(splitter.next().is_none()); assert_split_batches(batches, batch_size, num_rows); } + + #[tokio::test] + async fn test_swap_reverting_projection() { + let left_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + + let right_schema = Schema::new(vec![Field::new("c", DataType::Int32, false)]); + + let proj = swap_reverting_projection(&left_schema, &right_schema); + + assert_eq!(proj.len(), 3); + + let (col, name) = &proj[0]; + assert_eq!(name, "a"); + assert_col_expr(col, "a", 1); + + let (col, name) = &proj[1]; + assert_eq!(name, "b"); + assert_col_expr(col, "b", 2); + + let (col, name) = &proj[2]; + assert_eq!(name, "c"); + assert_col_expr(col, "c", 0); + } + + fn assert_col_expr(expr: &Arc, name: &str, index: usize) { + let col = expr + .as_any() + .downcast_ref::() + .expect("Projection items should be Column expression"); + assert_eq!(col.name(), name); + assert_eq!(col.index(), index); + } } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 22e5baf95938..cfa919425c54 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -27,12 +27,12 @@ use std::task::{Context, Poll}; use std::{any::Any, sync::Arc}; use super::{ - execution_mode_from_children, metrics::{ExecutionPlanMetricsSet, MetricsSet}, ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use crate::execution_plan::{boundedness_from_children, emission_type_from_children}; use crate::metrics::BaselineMetrics; use crate::stream::ObservedStream; @@ -135,14 +135,11 @@ impl UnionExec { .map(|plan| plan.output_partitioning().partition_count()) .sum(); let output_partitioning = Partitioning::UnknownPartitioning(num_partitions); - - // Determine execution mode: - let mode = execution_mode_from_children(inputs.iter()); - Ok(PlanProperties::new( eq_properties, output_partitioning, - mode, + emission_type_from_children(inputs), + boundedness_from_children(inputs), )) } } @@ -335,10 +332,12 @@ impl InterleaveExec { let eq_properties = EquivalenceProperties::new(schema); // Get output partitioning: let output_partitioning = inputs[0].output_partitioning().clone(); - // Determine execution mode: - let mode = execution_mode_from_children(inputs.iter()); - - PlanProperties::new(eq_properties, output_partitioning, mode) + PlanProperties::new( + eq_properties, + output_partitioning, + emission_type_from_children(inputs), + boundedness_from_children(inputs), + ) } } @@ -572,14 +571,16 @@ impl Stream for CombinedRecordBatchStream { } } -fn col_stats_union(left: ColumnStatistics, right: ColumnStatistics) -> ColumnStatistics { - ColumnStatistics { - null_count: left.null_count.add(&right.null_count), - max_value: left.max_value.max(&right.max_value), - min_value: left.min_value.min(&right.min_value), - sum_value: left.sum_value.add(&right.sum_value), - distinct_count: Precision::Absent, - } +fn col_stats_union( + mut left: ColumnStatistics, + right: ColumnStatistics, +) -> ColumnStatistics { + left.distinct_count = Precision::Absent; + left.min_value = left.min_value.min(&right.min_value); + left.max_value = left.max_value.max(&right.max_value); + left.null_count = left.null_count.add(&right.null_count); + + left } fn stats_union(mut left: Statistics, right: Statistics) -> Statistics { @@ -669,21 +670,18 @@ mod tests { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), - sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(0), }, ColumnStatistics { distinct_count: Precision::Exact(1), max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), - sum_value: Precision::Absent, null_count: Precision::Exact(3), }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))), min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))), - sum_value: Precision::Exact(ScalarValue::Float32(Some(42.0))), null_count: Precision::Absent, }, ], @@ -697,21 +695,18 @@ mod tests { distinct_count: Precision::Exact(3), max_value: Precision::Exact(ScalarValue::Int64(Some(34))), min_value: Precision::Exact(ScalarValue::Int64(Some(1))), - sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(1), }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::from("c")), min_value: Precision::Exact(ScalarValue::from("b")), - sum_value: Precision::Absent, null_count: Precision::Absent, }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, - sum_value: Precision::Absent, null_count: Precision::Absent, }, ], @@ -726,21 +721,18 @@ mod tests { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::Int64(Some(34))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), - sum_value: Precision::Exact(ScalarValue::Int64(Some(84))), null_count: Precision::Exact(1), }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), - sum_value: Precision::Absent, null_count: Precision::Absent, }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, - sum_value: Precision::Absent, null_count: Precision::Absent, }, ], @@ -851,9 +843,9 @@ mod tests { ) { // Check whether orderings are same. let lhs_orderings = lhs.oeq_class(); - let rhs_orderings = &rhs.oeq_class.orderings; + let rhs_orderings = rhs.oeq_class(); assert_eq!(lhs_orderings.len(), rhs_orderings.len(), "{}", err_msg); - for rhs_ordering in rhs_orderings { + for rhs_ordering in rhs_orderings.iter() { assert!(lhs_orderings.contains(rhs_ordering), "{}", err_msg); } } diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 152e48ed07bf..5c45703530df 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -34,6 +34,7 @@ use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use datafusion_common::{internal_err, plan_err, Result, ScalarValue}; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; +use crate::execution_plan::{Boundedness, EmissionType}; /// Execution plan for values list based relation (produces constant rows) #[derive(Debug, Clone)] @@ -133,12 +134,11 @@ impl ValuesExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); - PlanProperties::new( - eq_properties, + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } From 2449d2b5d2ad267c7d6db93b5af84c2a50acbe70 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 Jan 2025 12:44:43 +0000 Subject: [PATCH 4/8] Add sum statistic --- datafusion/physical-plan/src/union.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index cfa919425c54..9fa0164619a4 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -578,6 +578,7 @@ fn col_stats_union( left.distinct_count = Precision::Absent; left.min_value = left.min_value.min(&right.min_value); left.max_value = left.max_value.max(&right.max_value); + left.sum_value = left.sum_value.add(&right.sum_value); left.null_count = left.null_count.add(&right.null_count); left @@ -670,18 +671,21 @@ mod tests { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(0), }, ColumnStatistics { distinct_count: Precision::Exact(1), max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Exact(3), }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))), min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))), + sum_value: Precision::Exact(ScalarValue::Float32(Some(42.0))), null_count: Precision::Absent, }, ], @@ -695,18 +699,21 @@ mod tests { distinct_count: Precision::Exact(3), max_value: Precision::Exact(ScalarValue::Int64(Some(34))), min_value: Precision::Exact(ScalarValue::Int64(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(1), }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::from("c")), min_value: Precision::Exact(ScalarValue::from("b")), + sum_value: Precision::Absent, null_count: Precision::Absent, }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, null_count: Precision::Absent, }, ], @@ -721,18 +728,21 @@ mod tests { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::Int64(Some(34))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(84))), null_count: Precision::Exact(1), }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Absent, }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, null_count: Precision::Absent, }, ], From fbf318867de18ab4a8b860be6a4a5c6875748dbd Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 Jan 2025 12:48:00 +0000 Subject: [PATCH 5/8] Add sum statistic --- .../core/tests/custom_sources_cases/statistics.rs | 14 +++++++------- datafusion/physical-plan/src/values.rs | 7 ++----- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index b937b505bbda..1fd6dfec79fb 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -26,17 +26,18 @@ use datafusion::{ error::Result, logical_expr::Expr, physical_plan::{ - ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, - Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, + ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + PlanProperties, SendableRecordBatchStream, Statistics, }, prelude::SessionContext, scalar::ScalarValue, }; +use datafusion_catalog::Session; use datafusion_common::{project_schema, stats::Precision}; use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use async_trait::async_trait; -use datafusion_catalog::Session; /// This is a testing structure for statistics /// It will act both as a table provider and execution plan @@ -64,12 +65,11 @@ impl StatisticsValidation { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); - PlanProperties::new( - eq_properties, + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(2), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 5c45703530df..dc33dac808d0 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -20,10 +20,8 @@ use std::any::Any; use std::sync::Arc; -use super::{ - common, DisplayAs, ExecutionMode, PlanProperties, SendableRecordBatchStream, - Statistics, -}; +use super::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics}; +use crate::execution_plan::{Boundedness, EmissionType}; use crate::{ memory::MemoryStream, ColumnarValue, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, @@ -34,7 +32,6 @@ use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use datafusion_common::{internal_err, plan_err, Result, ScalarValue}; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; -use crate::execution_plan::{Boundedness, EmissionType}; /// Execution plan for values list based relation (produces constant rows) #[derive(Debug, Clone)] From 42ac6aab533babb741f306dc13b27a6591a6ab1b Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 10 Jan 2025 12:50:00 +0000 Subject: [PATCH 6/8] Add sum statistic --- datafusion/core/src/datasource/statistics.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index f81e7bb916de..f02927619a7d 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -73,7 +73,10 @@ pub async fn get_statistics_with_limit( for (index, file_column) in file_stats.column_statistics.clone().into_iter().enumerate() { - col_stats_set[index] = file_column; + col_stats_set[index].null_count = file_column.null_count; + col_stats_set[index].max_value = file_column.max_value; + col_stats_set[index].min_value = file_column.min_value; + col_stats_set[index].sum_value = file_column.sum_value; } // If the number of rows exceeds the limit, we can stop processing From 87c6a5ccbd47c100e2f087b0c43670f072a709e3 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Sun, 12 Jan 2025 17:28:24 +0000 Subject: [PATCH 7/8] Add tests and Cargo fmt --- datafusion/common/src/stats.rs | 92 ++++++++++++++++++- .../physical-plan/src/joins/cross_join.rs | 34 +++---- datafusion/physical-plan/src/union.rs | 2 +- 3 files changed, 109 insertions(+), 19 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 3c7ed0af1a1b..dd8848d24923 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -188,12 +188,12 @@ impl Precision { pub fn sub(&self, other: &Precision) -> Precision { match (self, other) { (Precision::Exact(a), Precision::Exact(b)) => { - a.add(b).map(Precision::Exact).unwrap_or(Precision::Absent) + a.sub(b).map(Precision::Exact).unwrap_or(Precision::Absent) } (Precision::Inexact(a), Precision::Exact(b)) | (Precision::Exact(a), Precision::Inexact(b)) | (Precision::Inexact(a), Precision::Inexact(b)) => a - .add(b) + .sub(b) .map(Precision::Inexact) .unwrap_or(Precision::Absent), (_, _) => Precision::Absent, @@ -623,6 +623,26 @@ mod tests { assert_eq!(precision1.add(&absent_precision), Precision::Absent); } + #[test] + fn test_add_scalar() { + let precision = Precision::Exact(ScalarValue::Int32(Some(42))); + + assert_eq!( + precision.add(&Precision::Exact(ScalarValue::Int32(Some(23)))), + Precision::Exact(ScalarValue::Int32(Some(65))), + ); + assert_eq!( + precision.add(&Precision::Inexact(ScalarValue::Int32(Some(23)))), + Precision::Inexact(ScalarValue::Int32(Some(65))), + ); + assert_eq!( + precision.add(&Precision::Exact(ScalarValue::Int32(None))), + // As per behavior of ScalarValue::add + Precision::Exact(ScalarValue::Int32(None)), + ); + assert_eq!(precision.add(&Precision::Absent), Precision::Absent); + } + #[test] fn test_sub() { let precision1 = Precision::Exact(42); @@ -635,6 +655,26 @@ mod tests { assert_eq!(precision1.sub(&absent_precision), Precision::Absent); } + #[test] + fn test_sub_scalar() { + let precision = Precision::Exact(ScalarValue::Int32(Some(42))); + + assert_eq!( + precision.sub(&Precision::Exact(ScalarValue::Int32(Some(23)))), + Precision::Exact(ScalarValue::Int32(Some(19))), + ); + assert_eq!( + precision.sub(&Precision::Inexact(ScalarValue::Int32(Some(23)))), + Precision::Inexact(ScalarValue::Int32(Some(19))), + ); + assert_eq!( + precision.sub(&Precision::Exact(ScalarValue::Int32(None))), + // As per behavior of ScalarValue::sub + Precision::Exact(ScalarValue::Int32(None)), + ); + assert_eq!(precision.sub(&Precision::Absent), Precision::Absent); + } + #[test] fn test_multiply() { let precision1 = Precision::Exact(6); @@ -648,6 +688,54 @@ mod tests { assert_eq!(precision1.multiply(&absent_precision), Precision::Absent); } + #[test] + fn test_multiply_scalar() { + let precision = Precision::Exact(ScalarValue::Int32(Some(6))); + + assert_eq!( + precision.multiply(&Precision::Exact(ScalarValue::Int32(Some(5)))), + Precision::Exact(ScalarValue::Int32(Some(30))), + ); + assert_eq!( + precision.multiply(&Precision::Inexact(ScalarValue::Int32(Some(5)))), + Precision::Inexact(ScalarValue::Int32(Some(30))), + ); + assert_eq!( + precision.multiply(&Precision::Exact(ScalarValue::Int32(None))), + // As per behavior of ScalarValue::mul_checked + Precision::Exact(ScalarValue::Int32(None)), + ); + assert_eq!(precision.multiply(&Precision::Absent), Precision::Absent); + } + + #[test] + fn test_cast_to() { + // Valid + assert_eq!( + Precision::Exact(ScalarValue::Int32(Some(42))) + .cast_to(&DataType::Int64) + .unwrap(), + Precision::Exact(ScalarValue::Int64(Some(42))), + ); + assert_eq!( + Precision::Inexact(ScalarValue::Int32(Some(42))) + .cast_to(&DataType::Int64) + .unwrap(), + Precision::Inexact(ScalarValue::Int64(Some(42))), + ); + // Null + assert_eq!( + Precision::Exact(ScalarValue::Int32(None)) + .cast_to(&DataType::Int64) + .unwrap(), + Precision::Exact(ScalarValue::Int64(None)), + ); + // Overflow returns error + assert!(Precision::Exact(ScalarValue::Int32(Some(256))) + .cast_to(&DataType::Int8) + .is_err()); + } + #[test] fn test_precision_cloning() { // Precision is copy diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 698758c27cde..bac5cad2a402 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -377,22 +377,24 @@ fn stats_cartesian_product( .map(|row_count| s.sum_value.multiply(&row_count)) .unwrap_or(Precision::Absent), }) - .chain(right_col_stats.into_iter().map(|s| ColumnStatistics { - null_count: s.null_count.multiply(&left_row_count), - distinct_count: s.distinct_count, - min_value: s.min_value, - max_value: s.max_value, - sum_value: s - .sum_value - .get_value() - // Cast the row count into the same type as any existing sum value - .and_then(|v| { - Precision::::from(left_row_count) - .cast_to(&v.data_type()) - .ok() - }) - .map(|row_count| s.sum_value.multiply(&row_count)) - .unwrap_or(Precision::Absent), + .chain(right_col_stats.into_iter().map(|s| { + ColumnStatistics { + null_count: s.null_count.multiply(&left_row_count), + distinct_count: s.distinct_count, + min_value: s.min_value, + max_value: s.max_value, + sum_value: s + .sum_value + .get_value() + // Cast the row count into the same type as any existing sum value + .and_then(|v| { + Precision::::from(left_row_count) + .cast_to(&v.data_type()) + .ok() + }) + .map(|row_count| s.sum_value.multiply(&row_count)) + .unwrap_or(Precision::Absent), + } })) .collect(); diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 9fa0164619a4..5d12c68df08b 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -578,7 +578,7 @@ fn col_stats_union( left.distinct_count = Precision::Absent; left.min_value = left.min_value.min(&right.min_value); left.max_value = left.max_value.max(&right.max_value); - left.sum_value = left.sum_value.add(&right.sum_value); + left.sum_value = left.sum_value.add(&right.sum_value); left.null_count = left.null_count.add(&right.null_count); left From 579e046b3548443b4714845d4732d25f40765f65 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 15 Jan 2025 14:18:59 +0000 Subject: [PATCH 8/8] fix up --- datafusion/physical-plan/src/memory.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 5e8ee713703b..b7403c890fb3 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -875,6 +875,7 @@ mod tests { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, },], } );