From 3593fc667e7dd3d4ad1fced674600fed103a47b1 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 14 Nov 2023 14:46:31 +0300 Subject: [PATCH 1/3] Preserve all of the valid orderings during merging. --- .../sort_preserving_repartition_fuzz.rs | 256 +++++++++++++++++- datafusion/physical-expr/src/equivalence.rs | 35 ++- .../physical-plan/src/repartition/mod.rs | 3 - .../src/sorts/sort_preserving_merge.rs | 3 +- datafusion/sqllogictest/test_files/window.slt | 45 +++ 5 files changed, 328 insertions(+), 14 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs index 818698d6c041..a66571937ae9 100644 --- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs @@ -17,22 +17,268 @@ #[cfg(test)] mod sp_repartition_fuzz_tests { - use arrow::compute::concat_batches; - use arrow_array::{ArrayRef, Int64Array, RecordBatch}; - use arrow_schema::SortOptions; + use arrow::compute::{concat_batches, lexsort, SortColumn}; + use arrow_array::{ArrayRef, Int64Array, RecordBatch, UInt64Array}; + use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions}; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion::physical_plan::{collect, ExecutionPlan, Partitioning}; use datafusion::prelude::SessionContext; + use datafusion_common::Result; use datafusion_execution::config::SessionConfig; - use datafusion_physical_expr::expressions::col; - use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; + use datafusion_execution::memory_pool::MemoryConsumer; + use datafusion_execution::SendableRecordBatchStream; + use datafusion_physical_expr::expressions::{col, Column}; + use datafusion_physical_expr::{ + EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, + }; + use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; + use datafusion_physical_plan::sorts::streaming_merge::streaming_merge; + use datafusion_physical_plan::stream::RecordBatchStreamAdapter; + use itertools::izip; use rand::rngs::StdRng; + use rand::seq::SliceRandom; use rand::{Rng, SeedableRng}; use std::sync::Arc; use test_utils::add_empty_batches; + // Generate a schema which consists of 6 columns (a, b, c, d, e, f) + fn create_test_schema() -> Result { + let a = Field::new("a", DataType::Int32, true); + let b = Field::new("b", DataType::Int32, true); + let c = Field::new("c", DataType::Int32, true); + let d = Field::new("d", DataType::Int32, true); + let e = Field::new("e", DataType::Int32, true); + let f = Field::new("f", DataType::Int32, true); + let schema = Arc::new(Schema::new(vec![a, b, c, d, e, f])); + + Ok(schema) + } + + /// Construct a schema with random ordering + /// among column a, b, c, d + /// where + /// Column [a=f] (e.g they are aliases). + /// Column e is constant. + fn create_random_schema(seed: u64) -> Result<(SchemaRef, EquivalenceProperties)> { + let test_schema = create_test_schema()?; + let col_a = &col("a", &test_schema)?; + let col_b = &col("b", &test_schema)?; + let col_c = &col("c", &test_schema)?; + let col_d = &col("d", &test_schema)?; + let col_e = &col("e", &test_schema)?; + let col_f = &col("f", &test_schema)?; + let col_exprs = [col_a, col_b, col_c, col_d, col_e, col_f]; + + let mut eq_properties = EquivalenceProperties::new(test_schema.clone()); + // Define a and f are aliases + eq_properties.add_equal_conditions(col_a, col_f); + // Column e has constant value. + eq_properties = eq_properties.add_constants([col_e.clone()]); + + // Randomly order columns for sorting + let mut rng = StdRng::seed_from_u64(seed); + let mut remaining_exprs = col_exprs[0..4].to_vec(); // only a, b, c, d are sorted + + let options_asc = SortOptions { + descending: false, + nulls_first: false, + }; + + while !remaining_exprs.is_empty() { + let n_sort_expr = rng.gen_range(0..remaining_exprs.len() + 1); + remaining_exprs.shuffle(&mut rng); + + let ordering = remaining_exprs + .drain(0..n_sort_expr) + .map(|expr| PhysicalSortExpr { + expr: expr.clone(), + options: options_asc, + }) + .collect(); + + eq_properties.add_new_orderings([ordering]); + } + + Ok((test_schema, eq_properties)) + } + + // If we already generated a random result for one of the + // expressions in the equivalence classes. For other expressions in the same + // equivalence class use same result. This util gets already calculated result, when available. + fn get_representative_arr( + eq_group: &[Arc], + existing_vec: &[Option], + schema: SchemaRef, + ) -> Option { + for expr in eq_group.iter() { + let col = expr.as_any().downcast_ref::().unwrap(); + let (idx, _field) = schema.column_with_name(col.name()).unwrap(); + if let Some(res) = &existing_vec[idx] { + return Some(res.clone()); + } + } + None + } + + // Generate a table that satisfies the given equivalence properties; i.e. + // equivalences, ordering equivalences, and constants. + fn generate_table_for_eq_properties( + eq_properties: &EquivalenceProperties, + n_elem: usize, + n_distinct: usize, + ) -> Result { + let mut rng = StdRng::seed_from_u64(23); + + let schema = eq_properties.schema(); + let mut schema_vec = vec![None; schema.fields.len()]; + + // Utility closure to generate random array + let mut generate_random_array = |num_elems: usize, max_val: usize| -> ArrayRef { + let values: Vec = (0..num_elems) + .map(|_| rng.gen_range(0..max_val) as u64) + .collect(); + Arc::new(UInt64Array::from_iter_values(values)) + }; + + // Fill constant columns + for constant in eq_properties.constants() { + let col = constant.as_any().downcast_ref::().unwrap(); + let (idx, _field) = schema.column_with_name(col.name()).unwrap(); + let arr = + Arc::new(UInt64Array::from_iter_values(vec![0; n_elem])) as ArrayRef; + schema_vec[idx] = Some(arr); + } + + // Fill columns based on ordering equivalences + for ordering in eq_properties.oeq_class().iter() { + let (sort_columns, indices): (Vec<_>, Vec<_>) = ordering + .iter() + .map(|PhysicalSortExpr { expr, options }| { + let col = expr.as_any().downcast_ref::().unwrap(); + let (idx, _field) = schema.column_with_name(col.name()).unwrap(); + let arr = generate_random_array(n_elem, n_distinct); + ( + SortColumn { + values: arr, + options: Some(*options), + }, + idx, + ) + }) + .unzip(); + + let sort_arrs = arrow::compute::lexsort(&sort_columns, None)?; + for (idx, arr) in izip!(indices, sort_arrs) { + schema_vec[idx] = Some(arr); + } + } + + // Fill columns based on equivalence groups + for eq_group in eq_properties.eq_group().iter() { + let representative_array = + get_representative_arr(eq_group, &schema_vec, schema.clone()) + .unwrap_or_else(|| generate_random_array(n_elem, n_distinct)); + + for expr in eq_group { + let col = expr.as_any().downcast_ref::().unwrap(); + let (idx, _field) = schema.column_with_name(col.name()).unwrap(); + schema_vec[idx] = Some(representative_array.clone()); + } + } + + let res: Vec<_> = schema_vec + .into_iter() + .zip(schema.fields.iter()) + .map(|(elem, field)| { + ( + field.name(), + // Generate random values for columns that do not occur in any of the groups (equivalence, ordering equivalence, constants) + elem.unwrap_or_else(|| generate_random_array(n_elem, n_distinct)), + ) + }) + .collect(); + + Ok(RecordBatch::try_from_iter(res)?) + } + + // This test checks for whether during sort preserving merge we can preserve all of the valid orderings + // successfully. If at the input we have orderings [a ASC, b ASC], [c ASC, d ASC] + // After sort preserving merge orderings [a ASC, b ASC], [c ASC, d ASC] should still be valid. + #[tokio::test] + async fn stream_merge_multi_order_preserve() -> Result<()> { + const N_PARTITION: usize = 8; + const N_ELEM: usize = 25; + const N_DISTINCT: usize = 5; + const N_DIFF_SCHEMA: usize = 20; + + use datafusion::physical_plan::common::collect; + for seed in 0..N_DIFF_SCHEMA { + // Create a schema with random equivalence properties + let (_test_schema, eq_properties) = create_random_schema(seed as u64)?; + let table_data_with_properties = + generate_table_for_eq_properties(&eq_properties, N_ELEM, N_DISTINCT)?; + let schema = table_data_with_properties.schema(); + let streams: Vec = (0..N_PARTITION) + .map(|_idx| { + let batch = table_data_with_properties.clone(); + Box::pin(RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::once(async { Ok(batch) }), + )) as SendableRecordBatchStream + }) + .collect::>(); + + // Returns concatenated version of the all available orderings + let exprs = eq_properties + .oeq_class() + .output_ordering() + .unwrap_or_default(); + + let context = SessionContext::new().task_ctx(); + let mem_reservation = + MemoryConsumer::new("test".to_string()).register(context.memory_pool()); + + // Internally SortPreservingMergeExec uses this function for merging. + let res = streaming_merge( + streams, + schema, + &exprs, + BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0), + 1, + None, + mem_reservation, + )?; + let res = collect(res).await?; + // Contains the merged result. + let res = concat_batches(&res[0].schema(), &res)?; + + for ordering in eq_properties.oeq_class().iter() { + let err_msg = format!("error in eq properties: {:?}", eq_properties); + let sort_solumns = ordering + .iter() + .map(|sort_expr| sort_expr.evaluate_to_sort_column(&res)) + .collect::>>()?; + let orig_columns = sort_solumns + .iter() + .map(|sort_column| sort_column.values.clone()) + .collect::>(); + let sorted_columns = lexsort(&sort_solumns, None)?; + + // Make sure after merging ordering is still valid. + assert_eq!(orig_columns.len(), sorted_columns.len(), "{}", err_msg); + assert!( + izip!(orig_columns.into_iter(), sorted_columns.into_iter()) + .all(|(lhs, rhs)| { lhs == rhs }), + "{}", + err_msg + ) + } + } + Ok(()) + } + #[tokio::test(flavor = "multi_thread", worker_threads = 8)] async fn sort_preserving_repartition_test() { let seed_start = 0; diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index d8aa09b90460..6fc86bd37575 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -133,7 +133,7 @@ impl EquivalenceGroup { } /// Returns an iterator over the equivalence classes in this group. - fn iter(&self) -> impl Iterator { + pub fn iter(&self) -> impl Iterator { self.classes.iter() } @@ -470,6 +470,19 @@ pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement { output } +/// This function constructs a duplicate-free `LexOrdering` by filtering out +/// duplicate entries that have same physical expression inside. For example, +/// `vec![a Asc, a Desc]` collapses to `vec![a Asc]`. +pub fn collapse_lex_ordering(input: LexOrdering) -> LexOrdering { + let mut output = Vec::::new(); + for item in input { + if !output.iter().any(|req| req.expr.eq(&item.expr)) { + output.push(item); + } + } + output +} + /// An `OrderingEquivalenceClass` object keeps track of different alternative /// orderings than can describe a schema. For example, consider the following table: /// @@ -575,10 +588,19 @@ impl OrderingEquivalenceClass { } } - /// Gets the first ordering entry in this ordering equivalence class. - /// This is one of the many valid orderings (if there are multiple). + /// Gets the concatenated version of the all orderings. + /// if orderings are [a ASC, b ASC] and [c ASC, d ASC] + /// Returns [a ASC, b ASC, c ASC, d ASC]. + /// This ensures that during during merging + /// [a ASC, b ASC] and [c ASC, d ASC] are still valid. pub fn output_ordering(&self) -> Option { - self.orderings.first().cloned() + let output_ordering = self + .orderings + .iter() + .flat_map(|ordering| ordering.to_vec()) + .collect::>(); + let output_ordering = collapse_lex_ordering(output_ordering); + (!output_ordering.is_empty()).then_some(output_ordering) } // Append orderings in `other` to all existing orderings in this equivalence @@ -733,6 +755,11 @@ impl EquivalenceProperties { &self.eq_group } + /// Returns a reference to the constant expressions + pub fn constants(&self) -> &[Arc] { + &self.constants + } + /// Returns the normalized version of the ordering equivalence class within. /// Normalization removes constants and duplicates as well as standardizing /// expressions according to the equivalence group within. diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 9719446d78d7..24f227d8a535 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -472,9 +472,6 @@ impl ExecutionPlan for RepartitionExec { if !self.maintains_input_order()[0] { result.clear_orderings(); } - if self.preserve_order { - result = result.with_reorder(self.sort_exprs().unwrap_or_default().to_vec()) - } result } diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 65cd8e41480e..f4b57e8bfb45 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -174,8 +174,7 @@ impl ExecutionPlan for SortPreservingMergeExec { } fn equivalence_properties(&self) -> EquivalenceProperties { - let output_oeq = self.input.equivalence_properties(); - output_oeq.with_reorder(self.expr.to_vec()) + self.input.equivalence_properties() } fn children(&self) -> Vec> { diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 2eb0576d559b..8be02b846cda 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3396,6 +3396,21 @@ WITH ORDER (a ASC, b ASC) WITH ORDER (c ASC) LOCATION '../core/tests/data/window_2.csv'; +# Create an unbounded source where there is multiple orderings. +statement ok +CREATE UNBOUNDED EXTERNAL TABLE multiple_ordered_table_inf ( + a0 INTEGER, + a INTEGER, + b INTEGER, + c INTEGER, + d INTEGER +) +STORED AS CSV +WITH HEADER ROW +WITH ORDER (a ASC, b ASC) +WITH ORDER (c ASC) +LOCATION '../core/tests/data/window_2.csv'; + # All of the window execs in the physical plan should work in the # sorted mode. query TT @@ -3477,3 +3492,33 @@ query II select sum(1) over() x, sum(1) over () y ---- 1 1 + +statement ok +set datafusion.execution.target_partitions = 2; + +# source is ordered by [a ASC, b ASC], [c ASC] +# after sort preserving repartition and sort preserving merge +# we should still have the orderings [a ASC, b ASC], [c ASC]. +query TT +EXPLAIN SELECT *, + AVG(d) OVER sliding_window AS avg_d +FROM multiple_ordered_table_inf +WINDOW sliding_window AS ( + PARTITION BY d + ORDER BY a RANGE 10 PRECEDING +) +ORDER BY c +---- +logical_plan +Sort: multiple_ordered_table_inf.c ASC NULLS LAST +--Projection: multiple_ordered_table_inf.a0, multiple_ordered_table_inf.a, multiple_ordered_table_inf.b, multiple_ordered_table_inf.c, multiple_ordered_table_inf.d, AVG(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW AS avg_d +----WindowAggr: windowExpr=[[AVG(CAST(multiple_ordered_table_inf.d AS Float64)) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW]] +------TableScan: multiple_ordered_table_inf projection=[a0, a, b, c, d] +physical_plan +SortPreservingMergeExec: [c@3 ASC NULLS LAST] +--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, AVG(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW@5 as avg_d] +----BoundedWindowAggExec: wdw=[AVG(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW: Ok(Field { name: "AVG(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: CurrentRow }], mode=[Linear] +------CoalesceBatchesExec: target_batch_size=4096 +--------SortPreservingRepartitionExec: partitioning=Hash([d@4], 2), input_partitions=2, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC NULLS LAST +----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], has_header=true From d094af62bcfa7af36f7d4eb01b7922ee8a44476e Mon Sep 17 00:00:00 2001 From: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Date: Tue, 14 Nov 2023 15:28:46 +0300 Subject: [PATCH 2/3] Update datafusion/physical-expr/src/equivalence.rs Co-authored-by: Mehmet Ozan Kabak --- datafusion/physical-expr/src/equivalence.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 6fc86bd37575..66922c51ec88 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -588,11 +588,8 @@ impl OrderingEquivalenceClass { } } - /// Gets the concatenated version of the all orderings. - /// if orderings are [a ASC, b ASC] and [c ASC, d ASC] - /// Returns [a ASC, b ASC, c ASC, d ASC]. - /// This ensures that during during merging - /// [a ASC, b ASC] and [c ASC, d ASC] are still valid. + /// Returns the concatenation of all the orderings. This enables merge + /// operations to preserve all equivalent orderings simultaneously. pub fn output_ordering(&self) -> Option { let output_ordering = self .orderings From 1b371e67e77de13915a131653a97540066cafd88 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 14 Nov 2023 15:42:10 +0300 Subject: [PATCH 3/3] Address reviews --- .../sort_preserving_repartition_fuzz.rs | 36 ++++++++++--------- datafusion/physical-expr/src/equivalence.rs | 11 +++--- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs index a66571937ae9..5bc29ba1c277 100644 --- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs @@ -17,32 +17,36 @@ #[cfg(test)] mod sp_repartition_fuzz_tests { + use std::sync::Arc; + use arrow::compute::{concat_batches, lexsort, SortColumn}; use arrow_array::{ArrayRef, Int64Array, RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions}; - use datafusion::physical_plan::memory::MemoryExec; - use datafusion::physical_plan::repartition::RepartitionExec; - use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; - use datafusion::physical_plan::{collect, ExecutionPlan, Partitioning}; + + use datafusion::physical_plan::{ + collect, + memory::MemoryExec, + metrics::{BaselineMetrics, ExecutionPlanMetricsSet}, + repartition::RepartitionExec, + sorts::sort_preserving_merge::SortPreservingMergeExec, + sorts::streaming_merge::streaming_merge, + stream::RecordBatchStreamAdapter, + ExecutionPlan, Partitioning, + }; use datafusion::prelude::SessionContext; use datafusion_common::Result; - use datafusion_execution::config::SessionConfig; - use datafusion_execution::memory_pool::MemoryConsumer; - use datafusion_execution::SendableRecordBatchStream; - use datafusion_physical_expr::expressions::{col, Column}; + use datafusion_execution::{ + config::SessionConfig, memory_pool::MemoryConsumer, SendableRecordBatchStream, + }; use datafusion_physical_expr::{ + expressions::{col, Column}, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, }; - use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; - use datafusion_physical_plan::sorts::streaming_merge::streaming_merge; - use datafusion_physical_plan::stream::RecordBatchStreamAdapter; - use itertools::izip; - use rand::rngs::StdRng; - use rand::seq::SliceRandom; - use rand::{Rng, SeedableRng}; - use std::sync::Arc; use test_utils::add_empty_batches; + use itertools::izip; + use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng}; + // Generate a schema which consists of 6 columns (a, b, c, d, e, f) fn create_test_schema() -> Result { let a = Field::new("a", DataType::Int32, true); diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 66922c51ec88..fa564049a0f0 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -459,7 +459,7 @@ impl EquivalenceGroup { /// This function constructs a duplicate-free `LexOrderingReq` by filtering out /// duplicate entries that have same physical expression inside. For example, -/// `vec![a Some(Asc), a Some(Desc)]` collapses to `vec![a Some(Asc)]`. +/// `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a Some(ASC)]`. pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement { let mut output = Vec::::new(); for item in input { @@ -472,7 +472,7 @@ pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement { /// This function constructs a duplicate-free `LexOrdering` by filtering out /// duplicate entries that have same physical expression inside. For example, -/// `vec![a Asc, a Desc]` collapses to `vec![a Asc]`. +/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`. pub fn collapse_lex_ordering(input: LexOrdering) -> LexOrdering { let mut output = Vec::::new(); for item in input { @@ -591,11 +591,8 @@ impl OrderingEquivalenceClass { /// Returns the concatenation of all the orderings. This enables merge /// operations to preserve all equivalent orderings simultaneously. pub fn output_ordering(&self) -> Option { - let output_ordering = self - .orderings - .iter() - .flat_map(|ordering| ordering.to_vec()) - .collect::>(); + let output_ordering = + self.orderings.iter().flatten().cloned().collect::>(); let output_ordering = collapse_lex_ordering(output_ordering); (!output_ordering.is_empty()).then_some(output_ordering) }