Description
Is your feature request related to a problem or challenge?
While upgrading IOx, to use #8006, I found that the EnforceSorting
pass was adding a sort node to an ExecutionPlan
, even when the input to that plan was sorted correctly and ExecutionPlan::maintains_input_order
returns true, where prior to #8006 it did not.
The issue is that our node did not also correctly report its EquivalenceProperties
which now the EnforceSorting pass relies on.
Thus, I think we are in the situation where
- the combination of
output_ordering
, andequivalence_properties
is used by EnforceSorting maintains_input_order
is used in other places
This leads to the situation where implementers of ExecutionPlan have to keep the three methods in sync, which I think this is confusing and error prone.
Describe the solution you'd like
I propose we deprecate maintains_input_order
(with a hint about equivalence classes to help other users) and use only output_ordering
and equivalence_properties
to determine if the order is maintained
Describe alternatives you've considered
No response
Additional context
Example
impl ExecutionPlan for DeduplicateExec {
...
fn maintains_input_order(&self) -> Vec<bool> {
vec![true]
}
...
}
For a plan like this
ProjectionExec: expr=[field_int@1 as field_int, tag1@2 as tag1, time@3 as time]
DeduplicateExec: [tag1@2 ASC,time@3 ASC]
SortExec: expr=[tag1@2 ASC,time@3 ASC,__chunk_order@0 ASC]
RecordBatchesExec: chunks=1, projection=[__chunk_order, field_int, tag1, time]
Prior to #8006
No sort is added,
2023-11-10T11:30:52.620358Z TRACE log: Optimized physical plan by EnforceSorting:
OutputRequirementExec
ProjectionExec: expr=[field_int@1 as field_int, tag1@2 as tag1, time@3 as time]
DeduplicateExec: [tag1@2 ASC,time@3 ASC]
SortExec: expr=[tag1@2 ASC,time@3 ASC,__chunk_order@0 ASC]
RecordBatchesExec: chunks=1, projection=[__chunk_order, field_int, tag1, time]
After #8006
The EnforceSorting rule adds a SortExec
at the top
2023-11-10T11:29:45.120962Z TRACE log: Optimized physical plan by EnforceSorting:
OutputRequirementExec
SortExec: expr=[tag1@1 ASC,time@2 ASC]
ProjectionExec: expr=[field_int@1 as field_int, tag1@2 as tag1, time@3 as time]
DeduplicateExec: [tag1@2 ASC,time@3 ASC]
SortExec: expr=[tag1@2 ASC,time@3 ASC,__chunk_order@0 ASC]
RecordBatchesExec: chunks=1, projection=[__chunk_order, field_int, tag1, time]
Adding equivalence_properties
fixed the problem:
impl ExecutionPlan for DeduplicateExec {
...
fn equivalence_properties(&self) -> EquivalenceProperties {
// deduplicate does not change the equivalence properties
self.input.equivalence_properties()
}
...
}