Skip to content

Commit 2950334

Browse files
rdettaiDandandan
andauthored
Move CBOs and Statistics to physical plan (#965)
* moved statistics method from logical to exec plan * [feat] make statistics async * [feat] fix tests with partial implem of AggregateStatistics optimizer rule * [lint] cargo fmt all * [fix] better structure for optimizer implem also fixed some clippy lint * [test] add tests for aggregate_statistics optim * [feat] add back min max stat optim * [feat] add back hash_build_probe_order optim * [fix] align on new compound name rule * [test] unit hash_build_probe_order optim * [test] union statistics compute * [test] stats for record batch helper * [test] statistics column projection * [test] stat computing for various plans * [test] adding some integ tests for stats * [test] sanity check for window expr * [test] window stat check len * [fix] suggestion about bool arithmetic Co-authored-by: Daniël Heres <danielheres@gmail.com> * [fix] should never commit from github * [review] for loop to iterator following @Dandandan's comment * [test] unit test for join optim * [fix] updated comments according to PR review * [fix] doc comments according to review hints Modifications according to comments from @alamb * [test] adding back previously asserted cases * [fix] running aggregate and join CBOs first in accordance with @alamb's review Co-authored-by: Daniël Heres <danielheres@gmail.com>
1 parent c2f26ca commit 2950334

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+2457
-1130
lines changed

ballista/rust/core/proto/ballista.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ message Statistics {
275275
int64 num_rows = 1;
276276
int64 total_byte_size = 2;
277277
repeated ColumnStats column_stats = 3;
278+
bool is_exact = 4;
278279
}
279280

280281
message PartitionedFile {

ballista/rust/core/src/datasource.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::{any::Any, sync::Arc};
2020
use datafusion::arrow::datatypes::SchemaRef;
2121
use datafusion::error::Result as DFResult;
2222
use datafusion::{
23-
datasource::{datasource::Statistics, TableProvider},
23+
datasource::TableProvider,
2424
logical_plan::{Expr, LogicalPlan},
2525
physical_plan::ExecutionPlan,
2626
};
@@ -61,12 +61,4 @@ impl TableProvider for DfTableAdapter {
6161
) -> DFResult<Arc<dyn ExecutionPlan>> {
6262
Ok(self.plan.clone())
6363
}
64-
65-
fn statistics(&self) -> Statistics {
66-
Statistics {
67-
num_rows: None,
68-
total_byte_size: None,
69-
column_statistics: None,
70-
}
71-
}
7264
}

ballista/rust/core/src/execution_plans/distributed_query.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use datafusion::error::{DataFusionError, Result};
3535
use datafusion::logical_plan::LogicalPlan;
3636
use datafusion::physical_plan::{
3737
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
38-
SendableRecordBatchStream,
38+
SendableRecordBatchStream, Statistics,
3939
};
4040

4141
use async_trait::async_trait;
@@ -203,6 +203,13 @@ impl ExecutionPlan for DistributedQueryExec {
203203
}
204204
}
205205
}
206+
207+
fn statistics(&self) -> Statistics {
208+
// This execution plan sends the logical plan to the scheduler without
209+
// performing the node by node conversion to a full physical plan.
210+
// This implies that we cannot infer the statistics at this stage.
211+
Statistics::default()
212+
}
206213
}
207214

208215
async fn fetch_partition(

ballista/rust/core/src/execution_plans/shuffle_reader.rs

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::{any::Any, pin::Pin};
2121

2222
use crate::client::BallistaClient;
2323
use crate::memory_stream::MemoryStream;
24-
use crate::serde::scheduler::PartitionLocation;
24+
use crate::serde::scheduler::{PartitionLocation, PartitionStats};
2525

2626
use crate::utils::WrappedStream;
2727
use async_trait::async_trait;
@@ -31,7 +31,9 @@ use datafusion::arrow::record_batch::RecordBatch;
3131
use datafusion::physical_plan::metrics::{
3232
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
3333
};
34-
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Metric, Partitioning};
34+
use datafusion::physical_plan::{
35+
DisplayFormatType, ExecutionPlan, Metric, Partitioning, Statistics,
36+
};
3537
use datafusion::{
3638
error::{DataFusionError, Result},
3739
physical_plan::RecordBatchStream,
@@ -156,6 +158,38 @@ impl ExecutionPlan for ShuffleReaderExec {
156158
fn metrics(&self) -> Option<MetricsSet> {
157159
Some(self.metrics.clone_inner())
158160
}
161+
162+
fn statistics(&self) -> Statistics {
163+
stats_for_partitions(
164+
self.partition
165+
.iter()
166+
.flatten()
167+
.map(|loc| loc.partition_stats),
168+
)
169+
}
170+
}
171+
172+
fn stats_for_partitions(
173+
partition_stats: impl Iterator<Item = PartitionStats>,
174+
) -> Statistics {
175+
// TODO stats: add column statistics to PartitionStats
176+
partition_stats.fold(
177+
Statistics {
178+
is_exact: true,
179+
num_rows: Some(0),
180+
total_byte_size: Some(0),
181+
column_statistics: None,
182+
},
183+
|mut acc, part| {
184+
// if any statistic is unkown it makes the entire statistic unkown
185+
acc.num_rows = acc.num_rows.zip(part.num_rows).map(|(a, b)| a + b as usize);
186+
acc.total_byte_size = acc
187+
.total_byte_size
188+
.zip(part.num_bytes)
189+
.map(|(a, b)| a + b as usize);
190+
acc
191+
},
192+
)
159193
}
160194

161195
async fn fetch_partition(
@@ -177,3 +211,76 @@ async fn fetch_partition(
177211
.await
178212
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?)
179213
}
214+
215+
#[cfg(test)]
216+
mod tests {
217+
use super::*;
218+
219+
#[tokio::test]
220+
async fn test_stats_for_partitions_empty() {
221+
let result = stats_for_partitions(std::iter::empty());
222+
223+
let exptected = Statistics {
224+
is_exact: true,
225+
num_rows: Some(0),
226+
total_byte_size: Some(0),
227+
column_statistics: None,
228+
};
229+
230+
assert_eq!(result, exptected);
231+
}
232+
233+
#[tokio::test]
234+
async fn test_stats_for_partitions_full() {
235+
let part_stats = vec![
236+
PartitionStats {
237+
num_rows: Some(10),
238+
num_bytes: Some(84),
239+
num_batches: Some(1),
240+
},
241+
PartitionStats {
242+
num_rows: Some(4),
243+
num_bytes: Some(65),
244+
num_batches: None,
245+
},
246+
];
247+
248+
let result = stats_for_partitions(part_stats.into_iter());
249+
250+
let exptected = Statistics {
251+
is_exact: true,
252+
num_rows: Some(14),
253+
total_byte_size: Some(149),
254+
column_statistics: None,
255+
};
256+
257+
assert_eq!(result, exptected);
258+
}
259+
260+
#[tokio::test]
261+
async fn test_stats_for_partitions_missing() {
262+
let part_stats = vec![
263+
PartitionStats {
264+
num_rows: Some(10),
265+
num_bytes: Some(84),
266+
num_batches: Some(1),
267+
},
268+
PartitionStats {
269+
num_rows: None,
270+
num_bytes: None,
271+
num_batches: None,
272+
},
273+
];
274+
275+
let result = stats_for_partitions(part_stats.into_iter());
276+
277+
let exptected = Statistics {
278+
is_exact: true,
279+
num_rows: None,
280+
total_byte_size: None,
281+
column_statistics: None,
282+
};
283+
284+
assert_eq!(result, exptected);
285+
}
286+
}

ballista/rust/core/src/execution_plans/shuffle_writer.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use datafusion::physical_plan::metrics::{
5151
use datafusion::physical_plan::repartition::RepartitionExec;
5252
use datafusion::physical_plan::Partitioning::RoundRobinBatch;
5353
use datafusion::physical_plan::{
54-
DisplayFormatType, ExecutionPlan, Metric, Partitioning, RecordBatchStream,
54+
DisplayFormatType, ExecutionPlan, Metric, Partitioning, RecordBatchStream, Statistics,
5555
};
5656
use futures::StreamExt;
5757
use hashbrown::HashMap;
@@ -417,6 +417,10 @@ impl ExecutionPlan for ShuffleWriterExec {
417417
}
418418
}
419419
}
420+
421+
fn statistics(&self) -> Statistics {
422+
self.plan.statistics()
423+
}
420424
}
421425

422426
fn result_schema() -> SchemaRef {

ballista/rust/core/src/execution_plans/unresolved_shuffle.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ use crate::serde::scheduler::PartitionLocation;
2323

2424
use async_trait::async_trait;
2525
use datafusion::arrow::datatypes::SchemaRef;
26-
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
26+
use datafusion::physical_plan::{
27+
DisplayFormatType, ExecutionPlan, Partitioning, Statistics,
28+
};
2729
use datafusion::{
2830
error::{DataFusionError, Result},
2931
physical_plan::RecordBatchStream,
@@ -117,4 +119,10 @@ impl ExecutionPlan for UnresolvedShuffleExec {
117119
}
118120
}
119121
}
122+
123+
fn statistics(&self) -> Statistics {
124+
// The full statistics are computed in the `ShuffleReaderExec` node
125+
// that replaces this one once the previous stage is completed.
126+
Statistics::default()
127+
}
120128
}

ballista/rust/core/src/serde/logical_plan/from_proto.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ impl TryInto<Statistics> for &protobuf::Statistics {
359359
num_rows: Some(self.num_rows as usize),
360360
total_byte_size: Some(self.total_byte_size as usize),
361361
column_statistics: Some(column_statistics),
362+
is_exact: self.is_exact,
362363
})
363364
}
364365
}
@@ -1177,8 +1178,7 @@ impl TryInto<Field> for &protobuf::Field {
11771178
}
11781179

11791180
use crate::serde::protobuf::ColumnStats;
1180-
use datafusion::datasource::datasource::{ColumnStatistics, Statistics};
1181-
use datafusion::physical_plan::{aggregates, windows};
1181+
use datafusion::physical_plan::{aggregates, windows, ColumnStatistics, Statistics};
11821182
use datafusion::prelude::{
11831183
array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256,
11841184
sha384, sha512, trim, upper,

ballista/rust/core/src/serde/logical_plan/to_proto.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use crate::serde::{protobuf, BallistaError};
2525
use datafusion::arrow::datatypes::{
2626
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit,
2727
};
28-
use datafusion::datasource::datasource::{ColumnStatistics, Statistics};
2928
use datafusion::datasource::{CsvFile, PartitionedFile, TableDescriptor};
3029
use datafusion::logical_plan::{
3130
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
@@ -36,6 +35,7 @@ use datafusion::physical_plan::functions::BuiltinScalarFunction;
3635
use datafusion::physical_plan::window_functions::{
3736
BuiltInWindowFunction, WindowFunction,
3837
};
38+
use datafusion::physical_plan::{ColumnStatistics, Statistics};
3939
use datafusion::{datasource::parquet::ParquetTable, logical_plan::exprlist_to_fields};
4040
use protobuf::{
4141
arrow_type, logical_expr_node::ExprType, scalar_type, DateUnit, PrimitiveScalarType,
@@ -278,6 +278,7 @@ impl From<&Statistics> for protobuf::Statistics {
278278
num_rows: s.num_rows.map(|n| n as i64).unwrap_or(none_value),
279279
total_byte_size: s.total_byte_size.map(|n| n as i64).unwrap_or(none_value),
280280
column_stats,
281+
is_exact: s.is_exact,
281282
}
282283
}
283284
}

ballista/rust/core/src/serde/physical_plan/from_proto.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
3434
use datafusion::catalog::catalog::{
3535
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
3636
};
37-
use datafusion::datasource::datasource::Statistics;
3837
use datafusion::datasource::object_store::ObjectStoreRegistry;
3938
use datafusion::datasource::FilePartition;
4039
use datafusion::execution::context::{
@@ -74,7 +73,9 @@ use datafusion::physical_plan::{
7473
sort::{SortExec, SortOptions},
7574
Partitioning,
7675
};
77-
use datafusion::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr};
76+
use datafusion::physical_plan::{
77+
AggregateExpr, ExecutionPlan, PhysicalExpr, Statistics, WindowExpr,
78+
};
7879
use datafusion::prelude::CsvReadOptions;
7980
use log::debug;
8081
use protobuf::physical_expr_node::ExprType;

ballista/rust/executor/src/collect.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use datafusion::arrow::{
2828
};
2929
use datafusion::error::DataFusionError;
3030
use datafusion::physical_plan::{
31-
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
31+
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
3232
};
3333
use datafusion::{error::Result, physical_plan::RecordBatchStream};
3434
use futures::stream::SelectAll;
@@ -116,6 +116,10 @@ impl ExecutionPlan for CollectExec {
116116
}
117117
}
118118
}
119+
120+
fn statistics(&self) -> Statistics {
121+
self.plan.statistics()
122+
}
119123
}
120124

121125
struct MergedRecordBatchStream {

datafusion/src/datasource/csv.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ use std::io::{Read, Seek};
3939
use std::string::String;
4040
use std::sync::{Arc, Mutex};
4141

42-
use crate::datasource::datasource::Statistics;
4342
use crate::datasource::{Source, TableProvider};
4443
use crate::error::{DataFusionError, Result};
4544
use crate::logical_plan::Expr;
@@ -54,7 +53,6 @@ pub struct CsvFile {
5453
has_header: bool,
5554
delimiter: u8,
5655
file_extension: String,
57-
statistics: Statistics,
5856
}
5957

6058
impl CsvFile {
@@ -82,7 +80,6 @@ impl CsvFile {
8280
has_header: options.has_header,
8381
delimiter: options.delimiter,
8482
file_extension: String::from(options.file_extension),
85-
statistics: Statistics::default(),
8683
})
8784
}
8885

@@ -105,7 +102,6 @@ impl CsvFile {
105102
schema,
106103
has_header: options.has_header,
107104
delimiter: options.delimiter,
108-
statistics: Statistics::default(),
109105
file_extension: String::new(),
110106
})
111107
}
@@ -133,7 +129,6 @@ impl CsvFile {
133129
schema,
134130
has_header: options.has_header,
135131
delimiter: options.delimiter,
136-
statistics: Statistics::default(),
137132
file_extension: String::new(),
138133
})
139134
}
@@ -210,10 +205,6 @@ impl TableProvider for CsvFile {
210205
};
211206
Ok(Arc::new(exec))
212207
}
213-
214-
fn statistics(&self) -> Statistics {
215-
self.statistics.clone()
216-
}
217208
}
218209

219210
#[cfg(test)]

0 commit comments

Comments
 (0)