Skip to content

refactor: rename FileStream.file_reader to file_opener & update doc #8883

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 1 commit into from
Jan 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions datafusion/core/src/datasource/physical_plan/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,9 @@ pub struct FileStream<F: FileOpener> {
projected_schema: SchemaRef,
/// The remaining number of records to parse, None if no limit
remain: Option<usize>,
/// A closure that takes a reader and an optional remaining number of lines
/// (before reaching the limit) and returns a batch iterator. If the file reader
/// is not capable of limiting the number of records in the last batch, the file
/// stream will take care of truncating it.
file_reader: F,
/// A generic [`FileOpener`]. Calling `open()` returns a [`FileOpenFuture`],
/// which can be resolved to a stream of `RecordBatch`.
file_opener: F,
/// The partition column projector
pc_projector: PartitionColumnProjector,
/// The stream state
Expand Down Expand Up @@ -250,7 +248,7 @@ impl<F: FileOpener> FileStream<F> {
pub fn new(
config: &FileScanConfig,
partition: usize,
file_reader: F,
file_opener: F,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Self> {
let (projected_schema, ..) = config.project();
Expand All @@ -269,7 +267,7 @@ impl<F: FileOpener> FileStream<F> {
file_iter: files.into(),
projected_schema,
remain: config.limit,
file_reader,
file_opener,
pc_projector,
state: FileStreamState::Idle,
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
Expand Down Expand Up @@ -301,7 +299,7 @@ impl<F: FileOpener> FileStream<F> {
};

Some(
self.file_reader
self.file_opener
.open(file_meta)
.map(|future| (future, part_file.partition_values)),
)
Expand Down