Skip to content

Commit 1c608e4

Browse files
committed
refactor: added test cases for orthogonal sorting, and remove 1 unneeded conditional
1 parent 820e08b commit 1c608e4

File tree

3 files changed

+72
-20
lines changed

3 files changed

+72
-20
lines changed

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ use crate::physical_optimizer::test_utils::{
2121
aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec,
2222
coalesce_partitions_exec, create_test_schema, create_test_schema2,
2323
create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, limit_exec,
24-
local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec, sort_expr,
25-
sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec,
26-
sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered,
27-
union_exec, RequirementsTestExec,
24+
local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec,
25+
sort_exec_with_fetch, sort_expr, sort_expr_options, sort_merge_join_exec,
26+
sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch,
27+
spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec,
2828
};
2929

3030
use arrow::compute::SortOptions;
@@ -3346,3 +3346,58 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
33463346

33473347
Ok(())
33483348
}
3349+
3350+
#[test]
3351+
fn test_removes_unused_orthogonal_sort() -> Result<()> {
3352+
let schema = create_test_schema3()?;
3353+
let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c", &schema)];
3354+
let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs.clone());
3355+
3356+
let orthogonal_sort = sort_exec(vec![sort_expr("a", &schema)], unbounded_input);
3357+
let output_sort = sort_exec(input_sort_exprs, orthogonal_sort); // same sort as data source
3358+
3359+
// Test scenario/input has an orthogonal sort:
3360+
let expected_input = [
3361+
"SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]",
3362+
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
3363+
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]"
3364+
];
3365+
assert_eq!(get_plan_string(&output_sort), expected_input,);
3366+
3367+
// Test: should remove orthogonal sort, and the uppermost (unneeded) sort:
3368+
let expected_optimized = [
3369+
"StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]"
3370+
];
3371+
assert_optimized!(expected_input, expected_optimized, output_sort, true);
3372+
3373+
Ok(())
3374+
}
3375+
3376+
#[test]
3377+
fn test_keeps_used_orthogonal_sort() -> Result<()> {
3378+
let schema = create_test_schema3()?;
3379+
let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c", &schema)];
3380+
let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs.clone());
3381+
3382+
let orthogonal_sort =
3383+
sort_exec_with_fetch(vec![sort_expr("a", &schema)], Some(3), unbounded_input);
3384+
let output_sort = sort_exec(input_sort_exprs, orthogonal_sort); // same sort as data source
3385+
3386+
// Test scenario/input has an orthogonal sort:
3387+
let expected_input = [
3388+
"SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]",
3389+
" SortExec: TopK(fetch=3), expr=[a@0 ASC], preserve_partitioning=[false]",
3390+
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]"
3391+
];
3392+
assert_eq!(get_plan_string(&output_sort), expected_input,);
3393+
3394+
// Test: should keep the orthogonal sort, since it modifies the output:
3395+
let expected_optimized = [
3396+
"SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]",
3397+
" SortExec: TopK(fetch=3), expr=[a@0 ASC], preserve_partitioning=[false]",
3398+
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]",
3399+
];
3400+
assert_optimized!(expected_input, expected_optimized, output_sort, true);
3401+
3402+
Ok(())
3403+
}

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,17 @@ pub fn coalesce_batches_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn Execution
295295
pub fn sort_exec(
296296
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
297297
input: Arc<dyn ExecutionPlan>,
298+
) -> Arc<dyn ExecutionPlan> {
299+
sort_exec_with_fetch(sort_exprs, None, input)
300+
}
301+
302+
pub fn sort_exec_with_fetch(
303+
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
304+
fetch: Option<usize>,
305+
input: Arc<dyn ExecutionPlan>,
298306
) -> Arc<dyn ExecutionPlan> {
299307
let sort_exprs = sort_exprs.into_iter().collect();
300-
Arc::new(SortExec::new(sort_exprs, input))
308+
Arc::new(SortExec::new(sort_exprs, input).with_fetch(fetch))
301309
}
302310

303311
/// A test [`ExecutionPlan`] whose requirements can be configured.

datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -127,22 +127,11 @@ fn pushdown_sorts_helper(
127127
sort_push_down =
128128
add_sort_above(sort_push_down, parent_reqs, parent_req_fetch);
129129

130-
// If we have totally orthogonal sort, (2 different sorts in a row), that means the child sort
131-
// gets immdiately re-sorted.
132-
// e.g. Sort col1 ASC
133-
// Sort col1 DESC
134-
//
135-
// Remove this redundant sort by not pushing down.
136-
let is_orthogonal_sort =
137-
!satisfy_parent && !parent_is_stricter && !current_is_stricter;
138-
139130
// make pushdown requirements be the new ones.
140-
if !is_orthogonal_sort || current_sort_fetch.is_some() {
141-
sort_push_down.children[0].data = ParentRequirements {
142-
ordering_requirement: Some(new_reqs),
143-
fetch: current_sort_fetch,
144-
};
145-
}
131+
sort_push_down.children[0].data = ParentRequirements {
132+
ordering_requirement: Some(new_reqs),
133+
fetch: current_sort_fetch,
134+
};
146135
} else {
147136
// Don't add a SortExec
148137
// Do update what sort requirements to keep pushing down

0 commit comments

Comments
 (0)