|
| 1 | +// Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +// or more contributor license agreements. See the NOTICE file |
| 3 | +// distributed with this work for additional information |
| 4 | +// regarding copyright ownership. The ASF licenses this file |
| 5 | +// to you under the Apache License, Version 2.0 (the |
| 6 | +// "License"); you may not use this file except in compliance |
| 7 | +// with the License. You may obtain a copy of the License at |
| 8 | +// |
| 9 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +// |
| 11 | +// Unless required by applicable law or agreed to in writing, |
| 12 | +// software distributed under the License is distributed on an |
| 13 | +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +// KIND, either express or implied. See the License for the |
| 15 | +// specific language governing permissions and limitations |
| 16 | +// under the License. |
| 17 | + |
| 18 | +use datafusion::error::Result; |
| 19 | +use datafusion::execution::SessionStateBuilder; |
| 20 | +use datafusion::physical_optimizer::enforce_sorting::EnforceSorting; |
| 21 | +use datafusion::physical_optimizer::limit_pushdown::LimitPushdown; |
| 22 | +use datafusion::physical_optimizer::projection_pushdown::ProjectionPushdown; |
| 23 | +use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan; |
| 24 | +use datafusion::physical_optimizer::{ |
| 25 | + coalesce_batches::CoalesceBatches, enforce_distribution::EnforceDistribution, |
| 26 | + output_requirements::OutputRequirements, PhysicalOptimizerRule, |
| 27 | +}; |
| 28 | +use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; |
| 29 | +use datafusion::prelude::*; |
| 30 | +use futures::StreamExt; |
| 31 | +use std::sync::Arc; |
| 32 | + |
| 33 | +#[tokio::test] |
| 34 | +async fn apply_enforce_distribution_multiple_times() -> Result<()> { |
| 35 | + // Create a configuration |
| 36 | + let config = SessionConfig::new(); |
| 37 | + let ctx = SessionContext::new_with_config(config); |
| 38 | + |
| 39 | + // Create table schema and data |
| 40 | + // To reproduce to bug: the LOCATION should contain more than one aggregate_test_100.csv |
| 41 | + let sql = "CREATE EXTERNAL TABLE aggregate_test_100 ( |
| 42 | + c1 VARCHAR NOT NULL, |
| 43 | + c2 TINYINT NOT NULL, |
| 44 | + c3 SMALLINT NOT NULL, |
| 45 | + c4 SMALLINT, |
| 46 | + c5 INT, |
| 47 | + c6 BIGINT NOT NULL, |
| 48 | + c7 SMALLINT NOT NULL, |
| 49 | + c8 INT NOT NULL, |
| 50 | + c9 BIGINT UNSIGNED NOT NULL, |
| 51 | + c10 VARCHAR NOT NULL, |
| 52 | + c11 FLOAT NOT NULL, |
| 53 | + c12 DOUBLE NOT NULL, |
| 54 | + c13 VARCHAR NOT NULL |
| 55 | + ) |
| 56 | + STORED AS CSV |
| 57 | + LOCATION '../../testing/data/csv/aggregate_test_100.csv' |
| 58 | + OPTIONS ('format.has_header' 'true')"; |
| 59 | + |
| 60 | + ctx.sql(sql).await?; |
| 61 | + |
| 62 | + let df = ctx.sql("SELECT * FROM(SELECT * FROM aggregate_test_100 UNION ALL SELECT * FROM aggregate_test_100) ORDER BY c13 LIMIT 5").await?; |
| 63 | + let logical_plan = df.logical_plan().clone(); |
| 64 | + let analyzed_logical_plan = ctx.state().analyzer().execute_and_check( |
| 65 | + logical_plan, |
| 66 | + ctx.state().config_options(), |
| 67 | + |_, _| (), |
| 68 | + )?; |
| 69 | + |
| 70 | + let optimized_logical_plan = ctx.state().optimizer().optimize( |
| 71 | + analyzed_logical_plan, |
| 72 | + &ctx.state(), |
| 73 | + |_, _| (), |
| 74 | + )?; |
| 75 | + |
| 76 | + let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![ |
| 77 | + Arc::new(OutputRequirements::new_add_mode()), |
| 78 | + Arc::new(EnforceDistribution::new()), |
| 79 | + Arc::new(EnforceSorting::new()), |
| 80 | + Arc::new(ProjectionPushdown::new()), |
| 81 | + Arc::new(CoalesceBatches::new()), |
| 82 | + Arc::new(EnforceDistribution::new()), // -- Add enforce distribution rule again |
| 83 | + Arc::new(OutputRequirements::new_remove_mode()), |
| 84 | + Arc::new(ProjectionPushdown::new()), |
| 85 | + Arc::new(LimitPushdown::new()), |
| 86 | + Arc::new(SanityCheckPlan::new()), |
| 87 | + ]; |
| 88 | + |
| 89 | + let planner = DefaultPhysicalPlanner::default(); |
| 90 | + let session_state = SessionStateBuilder::new() |
| 91 | + .with_config(ctx.copied_config()) |
| 92 | + .with_default_features() |
| 93 | + .with_physical_optimizer_rules(optimizers) |
| 94 | + .build(); |
| 95 | + let optimized_physical_plan = planner |
| 96 | + .create_physical_plan(&optimized_logical_plan, &session_state) |
| 97 | + .await?; |
| 98 | + |
| 99 | + let mut results = optimized_physical_plan |
| 100 | + .execute(0, ctx.task_ctx().clone()) |
| 101 | + .unwrap(); |
| 102 | + |
| 103 | + let batch = results.next().await.unwrap()?; |
| 104 | + // With the fix of https://github.com/apache/datafusion/pull/14207, the number of rows will be 10 |
| 105 | + assert_eq!(batch.num_rows(), 5); |
| 106 | + Ok(()) |
| 107 | +} |
0 commit comments