Skip to content

Commit d37351a

Browse files
authored
fix: from_plan generate Agg/Window can be with different schema. (#6820)
* fix: from_plan generate Agg can be with different schema. * fix: from_plan generate Window can be with different schema.
1 parent 4a0b65d commit d37351a

File tree

3 files changed

+36
-29
lines changed

3 files changed

+36
-29
lines changed

datafusion/expr/src/logical_plan/builder.rs

+5-10
Original file line numberDiff line numberDiff line change
@@ -825,17 +825,11 @@ impl LogicalPlanBuilder {
825825
window_expr: impl IntoIterator<Item = impl Into<Expr>>,
826826
) -> Result<Self> {
827827
let window_expr = normalize_cols(window_expr, &self.plan)?;
828-
let all_expr = window_expr.iter();
829-
validate_unique_names("Windows", all_expr.clone())?;
830-
let mut window_fields: Vec<DFField> = self.plan.schema().fields().clone();
831-
window_fields.extend_from_slice(&exprlist_to_fields(all_expr, &self.plan)?);
832-
let metadata = self.plan.schema().metadata().clone();
833-
834-
Ok(Self::from(LogicalPlan::Window(Window {
835-
input: Arc::new(self.plan),
828+
validate_unique_names("Windows", &window_expr)?;
829+
Ok(Self::from(LogicalPlan::Window(Window::try_new(
836830
window_expr,
837-
schema: Arc::new(DFSchema::new_with_metadata(window_fields, metadata)?),
838-
})))
831+
Arc::new(self.plan),
832+
)?)))
839833
}
840834

841835
/// Apply an aggregate: grouping on the `group_expr` expressions
@@ -1229,6 +1223,7 @@ pub fn project(
12291223
plan: LogicalPlan,
12301224
expr: impl IntoIterator<Item = impl Into<Expr>>,
12311225
) -> Result<LogicalPlan> {
1226+
// TODO: move it into analyzer
12321227
let input_schema = plan.schema();
12331228
let mut projected_expr = vec![];
12341229
for e in expr {

datafusion/expr/src/logical_plan/plan.rs

+18-2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ use datafusion_common::tree_node::{
3535
Transformed, TreeNode, TreeNodeVisitor, VisitRecursion,
3636
};
3737
use datafusion_common::{
38-
plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference,
39-
Result, ScalarValue,
38+
plan_err, Column, DFField, DFSchema, DFSchemaRef, DataFusionError,
39+
OwnedTableReference, Result, ScalarValue,
4040
};
4141
use std::collections::{HashMap, HashSet};
4242
use std::fmt::{self, Debug, Display, Formatter};
@@ -1400,6 +1400,22 @@ pub struct Window {
14001400
pub schema: DFSchemaRef,
14011401
}
14021402

1403+
impl Window {
1404+
/// Create a new window operator.
1405+
pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
1406+
let mut window_fields: Vec<DFField> = input.schema().fields().clone();
1407+
window_fields
1408+
.extend_from_slice(&exprlist_to_fields(window_expr.iter(), input.as_ref())?);
1409+
let metadata = input.schema().metadata().clone();
1410+
1411+
Ok(Window {
1412+
input,
1413+
window_expr,
1414+
schema: Arc::new(DFSchema::new_with_metadata(window_fields, metadata)?),
1415+
})
1416+
}
1417+
}
1418+
14031419
/// Produces rows from a table provider by reference or from the context
14041420
#[derive(Clone)]
14051421
pub struct TableScan {

datafusion/expr/src/utils.rs

+13-17
Original file line numberDiff line numberDiff line change
@@ -818,23 +818,19 @@ pub fn from_plan(
818818
input: Arc::new(inputs[0].clone()),
819819
})),
820820
},
821-
LogicalPlan::Window(Window {
822-
window_expr,
823-
schema,
824-
..
825-
}) => Ok(LogicalPlan::Window(Window {
826-
input: Arc::new(inputs[0].clone()),
827-
window_expr: expr[0..window_expr.len()].to_vec(),
828-
schema: schema.clone(),
829-
})),
830-
LogicalPlan::Aggregate(Aggregate {
831-
group_expr, schema, ..
832-
}) => Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
833-
Arc::new(inputs[0].clone()),
834-
expr[0..group_expr.len()].to_vec(),
835-
expr[group_expr.len()..].to_vec(),
836-
schema.clone(),
837-
)?)),
821+
LogicalPlan::Window(Window { window_expr, .. }) => {
822+
Ok(LogicalPlan::Window(Window::try_new(
823+
expr[0..window_expr.len()].to_vec(),
824+
Arc::new(inputs[0].clone()),
825+
)?))
826+
}
827+
LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
828+
Ok(LogicalPlan::Aggregate(Aggregate::try_new(
829+
Arc::new(inputs[0].clone()),
830+
expr[0..group_expr.len()].to_vec(),
831+
expr[group_expr.len()..].to_vec(),
832+
)?))
833+
}
838834
LogicalPlan::Sort(SortPlan { fetch, .. }) => Ok(LogicalPlan::Sort(SortPlan {
839835
expr: expr.to_vec(),
840836
input: Arc::new(inputs[0].clone()),

0 commit comments

Comments
 (0)