Skip to content

Minor: provide default implementation for ExecutionPlan::statistics #7911

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 2 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan,
SendableRecordBatchStream, Statistics,
SendableRecordBatchStream,
};
use datafusion::prelude::*;
use datafusion_expr::{Expr, LogicalPlanBuilder};
Expand Down Expand Up @@ -270,8 +270,4 @@ impl ExecutionPlan for CustomExec {
None,
)?))
}

fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}
8 changes: 1 addition & 7 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2057,9 +2057,7 @@ mod tests {
use super::*;
use crate::datasource::file_format::options::CsvReadOptions;
use crate::datasource::MemTable;
use crate::physical_plan::{
expressions, DisplayFormatType, Partitioning, Statistics,
};
use crate::physical_plan::{expressions, DisplayFormatType, Partitioning};
use crate::physical_plan::{DisplayAs, SendableRecordBatchStream};
use crate::physical_planner::PhysicalPlanner;
use crate::prelude::{SessionConfig, SessionContext};
Expand Down Expand Up @@ -2670,10 +2668,6 @@ mod tests {
) -> Result<SendableRecordBatchStream> {
unimplemented!("NoOpExecutionPlan::execute");
}

fn statistics(&self) -> Result<Statistics> {
unimplemented!("NoOpExecutionPlan::statistics");
}
}

// Produces an execution plan where the schema is mismatched from
Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::prelude::{CsvReadOptions, SessionContext};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::{Statistics, TableReference};
use datafusion_common::TableReference;
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_expr::PhysicalSortExpr;

Expand Down Expand Up @@ -238,10 +238,6 @@ impl ExecutionPlan for UnboundedExec {
batch: self.batch.clone(),
}))
}

fn statistics(&self) -> Result<Statistics> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole point of this PR is to provide this as a default implementation

Ok(Statistics::new_unknown(&self.schema()))
}
}

#[derive(Debug)]
Expand Down
7 changes: 1 addition & 6 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use super::{DisplayAs, Distribution, SendableRecordBatchStream};

use crate::display::DisplayableExecutionPlan;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics};
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};

use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::{internal_err, DataFusionError, Result};
Expand Down Expand Up @@ -195,11 +195,6 @@ impl ExecutionPlan for AnalyzeExec {
futures::stream::once(output),
)))
}

fn statistics(&self) -> Result<Statistics> {
// Statistics an an ANALYZE plan are not relevant
Ok(Statistics::new_unknown(&self.schema()))
}
}

/// Creates the ouput of AnalyzeExec as a RecordBatch
Expand Down
7 changes: 1 addition & 6 deletions datafusion/physical-plan/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
use super::expressions::PhysicalSortExpr;
use super::{DisplayAs, SendableRecordBatchStream};
use crate::stream::RecordBatchStreamAdapter;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics};
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};

use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::display::StringifiedPlan;
Expand Down Expand Up @@ -167,11 +167,6 @@ impl ExecutionPlan for ExplainExec {
futures::stream::iter(vec![Ok(record_batch)]),
)))
}

fn statistics(&self) -> Result<Statistics> {
// Statistics an EXPLAIN plan are not relevant
Ok(Statistics::new_unknown(&self.schema()))
}
}

/// If this plan should be shown, given the previous plan that was
Expand Down
5 changes: 0 additions & 5 deletions datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::sync::Arc;
use super::expressions::PhysicalSortExpr;
use super::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
};
use crate::metrics::MetricsSet;
use crate::stream::RecordBatchStreamAdapter;
Expand Down Expand Up @@ -276,10 +275,6 @@ impl ExecutionPlan for FileSinkExec {
stream,
)))
}

fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}

/// Create a output record batch with a count
Expand Down
8 changes: 6 additions & 2 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,12 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
None
}

/// Returns the global output statistics for this `ExecutionPlan` node.
fn statistics(&self) -> Result<Statistics>;
/// Returns statistics for this `ExecutionPlan` node. If statistics are not
/// available, should return [`Statistics::new_unknown`] (the default), not
/// an error.
fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}

/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
Expand Down
6 changes: 1 addition & 5 deletions datafusion/physical-plan/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::stream::RecordBatchStreamAdapter;
use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};

use arrow::datatypes::SchemaRef;
use datafusion_common::{internal_err, plan_err, DataFusionError, Result, Statistics};
use datafusion_common::{internal_err, plan_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};

Expand Down Expand Up @@ -187,8 +187,4 @@ impl ExecutionPlan for StreamingTableExec {
None => stream,
})
}

fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}
12 changes: 0 additions & 12 deletions datafusion/physical-plan/src/test/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,10 +453,6 @@ impl ExecutionPlan for ErrorExec {
) -> Result<SendableRecordBatchStream> {
internal_err!("ErrorExec, unsurprisingly, errored in partition {partition}")
}

fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}

/// A mock execution plan that simply returns the provided statistics
Expand Down Expand Up @@ -627,10 +623,6 @@ impl ExecutionPlan for BlockingExec {
_refs: Arc::clone(&self.refs),
}))
}

fn statistics(&self) -> Result<Statistics> {
unimplemented!()
}
}

/// A [`RecordBatchStream`] that is pending forever.
Expand Down Expand Up @@ -764,10 +756,6 @@ impl ExecutionPlan for PanicExec {
ready: false,
}))
}

fn statistics(&self) -> Result<Statistics> {
unimplemented!()
}
}

/// A [`RecordBatchStream`] that yields every other batch and panics
Expand Down
6 changes: 1 addition & 5 deletions datafusion/physical-plan/src/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::DisplayAs;
use crate::{
expressions::Column, DisplayFormatType, Distribution, EquivalenceProperties,
ExecutionPlan, Partitioning, PhysicalExpr, PhysicalSortExpr, RecordBatchStream,
SendableRecordBatchStream, Statistics,
SendableRecordBatchStream,
};

use arrow::array::{
Expand Down Expand Up @@ -159,10 +159,6 @@ impl ExecutionPlan for UnnestExec {
unnest_time: 0,
}))
}

fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}

/// A stream that issues [RecordBatch]es with unnested column data.
Expand Down