Skip to content

Fix sequential metadata fetching in ListingTable causing high latency #14918

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 3 commits into from
Mar 3, 2025

Conversation

geoffreyclaude
Copy link
Contributor

@geoffreyclaude geoffreyclaude commented Feb 27, 2025

Which issue does this PR close?

Rationale for this change

When scanning an exact list of remote Parquet files, the ListingTable was fetching file metadata (via head calls) sequentially. This was due to using stream::iter(file_list).flatten(), which processes each one-item stream in order. For remote blob stores, where each head call can take tens to hundreds of milliseconds, this sequential behavior significantly increased the time to create the physical plan.

What changes are included in this PR?

This commit replaces the sequential flattening with concurrent merging using stream::iter(file_list).flatten_unordered(meta_fetch_concurrency). With this change, the head requests are executed in parallel (up to the configured meta_fetch_concurrency limit), reducing latency when creating the physical plan.
Note that the ordering loss introduced by flatten_unordered is perfectly acceptable as the file list will anyways be fully sorted by path in split_files before being returned.

Are these changes tested?

Tests have been updated to ensure that metadata fetching occurs concurrently.

Are there any user-facing changes?

No user-facing changes besides reducing the latency in this particular situation.

let file_list = stream::iter(file_list).flatten();
let meta_fetch_concurrency =
ctx.config_options().execution.meta_fetch_concurrency;
let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix, the rest of the changes are unit tests.

@@ -1115,7 +1117,7 @@ impl ListingTable {
}
})
.boxed()
.buffered(ctx.config_options().execution.meta_fetch_concurrency);
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ordering does not matter here, so might as well use buffer_unordered to avoid stalling on an outlier.

@alamb
Copy link
Contributor

alamb commented Feb 28, 2025

Thanks @geoffreyclaude -- I kicked off the CI for this

@alamb alamb mentioned this pull request Feb 28, 2025
10 tasks
@xudong963 xudong963 self-requested a review February 28, 2025 06:34
When scanning an exact list of remote Parquet files, the ListingTable was fetching file metadata (via head calls) sequentially. This was due to using `stream::iter(file_list).flatten()`, which processes each one-item stream in order. For remote blob stores, where each head call can take tens to hundreds of milliseconds, this sequential behavior significantly increased the time to create the physical plan.

This commit replaces the sequential flattening with concurrent merging using `stream::iter(file_list).flatten_unordered(meta_fetch_concurrency)`. With this change, the `head` requests are executed in parallel (up to the configured `meta_fetch_concurrency` limit), reducing latency when creating the physical plan.

Note that the ordering loss introduced by `flatten_unordered` is perfectly acceptable as the file list will anyways be fully sorted by path in `split_files` before being returned.

Additionally, tests have been updated to ensure that metadata fetching occurs concurrently.
@alamb
Copy link
Contributor

alamb commented Feb 28, 2025

I pushed a commit to fix the formatting and try to get a clean CI run

@geoffreyclaude
Copy link
Contributor Author

I pushed a commit to fix the formatting and try to get a clean CI run

Thanks! I had environment issues which prevented me from running the CI locally. Figured adding an extra basic unit test was simple enough to skip it 🤦

.execution
.meta_fetch_concurrency;
let expected_concurrency = files.len().min(meta_fetch_concurrency);
let head_blocking_store = ensure_head_concurrency(store, expected_concurrency);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very cool

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @geoffreyclaude -- this is a really nice PR: Well documented, and well tested 🦾

"{} received head call for {location}",
BlockingObjectStore::NAME
);
// Wait until the expected number of concurrent calls is reached, but timeout after 1 second to avoid hanging failing tests.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb added the performance Make DataFusion faster label Feb 28, 2025
@alamb
Copy link
Contributor

alamb commented Mar 3, 2025

I tried to verify these changes but I couldn't figure out how to create an external table with explicitly listing the names via SQL.

For posterity here is what I tried:

This PR:

> create external table hits stored as parquet OPTIONS('aws.region' 'eu-central-1') location 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/';
0 row(s) fetched.
Elapsed 2.690 seconds.

Main

> create external table hits stored as parquet OPTIONS('aws.region' 'eu-central-1') location 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/';
0 row(s) fetched.
Elapsed 2.284 seconds.

I am going to try and whip up a smaller reproducer

@geoffreyclaude
Copy link
Contributor Author

I tried to verify these changes but I couldn't figure out how to create an external table with explicitly listing the names via SQL.

For posterity here is what I tried:

This PR:

> create external table hits stored as parquet OPTIONS('aws.region' 'eu-central-1') location 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/';
0 row(s) fetched.
Elapsed 2.690 seconds.

Main

> create external table hits stored as parquet OPTIONS('aws.region' 'eu-central-1') location 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/';
0 row(s) fetched.
Elapsed 2.284 seconds.

I am going to try and whip up a smaller reproducer

We got there through Substrait, by converting a custom input into a ParquetExec... So not the most straightforward one-line reproducer unfortunately!

@alamb
Copy link
Contributor

alamb commented Mar 3, 2025

I still could not reproduce any improvement with this PR, FWIW. I still think it is a good change so i merged it in, but it might be cool to find some benchmark results that showed the improvement

Details

use std::sync::Arc;
use std::time::Instant;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl};
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::prelude::SessionContext;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    let ctx = SessionContext::new();
    let object_store_url = ObjectStoreUrl::parse("https://datasets.clickhouse.com").unwrap();
    let object_store = object_store::http::HttpBuilder::new()
        .with_url(object_store_url.as_str())
        .build()
        .unwrap();

    ctx.register_object_store(object_store_url.as_ref(),
        Arc::new(object_store));


    // urls are like
    // https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet'
    //let base_url = ObjectStoreUrl::parse("https://datasets.clickhouse.com").unwrap();
    let paths: Vec<ListingTableUrl> = (1..100).map(|i| format!("https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_{i}.parquet"))
        .map(|url| ListingTableUrl::parse(&url).unwrap())
        .collect();

    let listing_options = ListingOptions::new(Arc::new(ParquetFormat::new()))
        .with_collect_stat(true);

    let start = Instant::now();
    println!("Creating table / reading statistics....");
    let config = ListingTableConfig::new_with_multi_paths(paths)
        .with_listing_options(listing_options)
        .infer_schema(&ctx.state()).await?;
    let listing_table = ListingTable::try_new(config).unwrap();
    let df = ctx.read_table(Arc::new(listing_table))?;
    println!("Done in {:?}", Instant::now() - start);

    println!("running query");
    let start = Instant::now();
    let batches = df.limit(0, Some(10))?.collect().await.unwrap();
    println!("Got {} batches in  {:?}", batches.len(), Instant::now() - start);

    Ok(())
}

Some testing numbers (the results vary wildly)

On this branch

Creating table / reading statistics....
Done in 250.333042ms
running query
Got 1 batches in  1.943637416s
hello world!
(venv) andrewlamb@Andrews-MacBook-Pro-2:~/Software/rust_playground$ cargo run --release
    Finished `release` profile [optimized] target(s) in 0.21s
     Running `target/release/rust_playground`
Creating table / reading statistics....
Done in 174.578ms
running query
Got 1 batches in  1.62131175s
hello world!
(venv) andrewlamb@Andrews-MacBook-Pro-2:~/Software/rust_playground$ cargo run --release
    Finished `release` profile [optimized] target(s) in 0.12s
     Running `target/release/rust_playground`
Creating table / reading statistics....
Done in 191.24325ms
running query
Got 1 batches in  1.257049458s
hello world!

On main

Creating table / reading statistics....
Done in 165.25ms
running query
Got 1 batches in  819.607625ms
hello world!
(venv) andrewlamb@Andrews-MacBook-Pro-2:~/Software/rust_playground$ cargo run --release
    Finished `release` profile [optimized] target(s) in 0.20s
     Running `target/release/rust_playground`
Creating table / reading statistics....
Done in 165.120666ms
running query
Got 1 batches in  1.036410625s
hello world!
(venv) andrewlamb@Andrews-MacBook-Pro-2:~/Software/rust_playground$ cargo run --release
    Finished `release` profile [optimized] target(s) in 0.10s
     Running `target/release/rust_playground`
Creating table / reading statistics....
Done in 198.459166ms
running query
Got 1 batches in  831.307041ms
hello world!

@alamb alamb merged commit 1a6390b into apache:main Mar 3, 2025
24 checks passed
@alamb
Copy link
Contributor

alamb commented Mar 3, 2025

Thanks again @geoffreyclaude

@geoffreyclaude
Copy link
Contributor Author

I still could not reproduce any improvement with this PR, FWIW. I still think it is a good change so i merged it in, but it might be cool to find some benchmark results that showed the improvement

@alamb: I've updated your reproducer to do a COUNT(*) instead of a LIMIT(10). The COUNT(*) only needs to read the metadata from the object store, as the number of rows is stored in the Parquet metadata, and effectively "runs" the query during the physical plan creation.

Without the fix, the physical plan creation takes ~2.5 seconds.
With the fix, it drops to ~700ms.

The logical planning and query execution take the same time in both.

use arrow::util::pretty::pretty_format_batches;
use datafusion::config::{ParquetOptions, TableParquetOptions};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::functions_aggregate::expr_fn::count;
use datafusion::logical_expr::utils::COUNT_STAR_EXPANSION;
use datafusion::physical_plan::displayable;
use datafusion::prelude::{Expr, SessionContext};
use futures::StreamExt;
use std::sync::Arc;
use std::time::Instant;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    let ctx = SessionContext::new();
    let object_store_url =
        ObjectStoreUrl::parse("https://datasets.clickhouse.com").unwrap();
    let object_store = object_store::http::HttpBuilder::new()
        .with_url(object_store_url.as_str())
        .build()
        .unwrap();

    ctx.register_object_store(object_store_url.as_ref(), Arc::new(object_store));

    // urls are like
    // https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet'
    //let base_url = ObjectStoreUrl::parse("https://datasets.clickhouse.com").unwrap();
    let paths: Vec<ListingTableUrl> = (1..100).map(|i| format!("https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_{i}.parquet"))
        .map(|url| ListingTableUrl::parse(&url).unwrap())
        .collect();

    let listing_options =
        ListingOptions::new(Arc::new(ParquetFormat::new())).with_collect_stat(true);

    println!("Creating table / reading statistics....");
    let config = ListingTableConfig::new_with_multi_paths(paths)
        .with_listing_options(listing_options)
        .infer_schema(&ctx.state())
        .await?;
    let start = Instant::now();
    let listing_table = ListingTable::try_new(config).unwrap();
    let df = ctx
        .read_table(Arc::new(listing_table))?
        .aggregate(vec![], vec![count(Expr::Literal(COUNT_STAR_EXPANSION))])?;
    println!("Done in {:?}", Instant::now() - start);

    let logical_plan_str = df.logical_plan().display_indent().to_string();
    println!("Logical plan:\n{logical_plan_str}");

    println!("Creating physical plan...");

    let start = Instant::now();
    let physical_plan = df.create_physical_plan().await?;
    println!("Done in {:?}", Instant::now() - start);

    let physical_plan_str = displayable(physical_plan.as_ref()).indent(true).to_string();
    println!("Physical plan:\n{physical_plan_str}");

    println!("Running query...");
    let start = Instant::now();
    let mut result_stream = physical_plan.execute(0, ctx.task_ctx())?;
    let mut batches = vec![];

    while let Some(record) = result_stream.next().await {
        batches.push(record?);
    }
    println!(
        "Got {} batches in  {:?}",
        batches.len(),
        Instant::now() - start
    );

    let response_str = pretty_format_batches(&batches)?.to_string();
    println!("Query result:\n{response_str}");

    Ok(())
}

main

Creating table / reading statistics....
Done in 2.267666ms
Logical plan:
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
  TableScan: ?table?
Creating physical plan...
Done in 2.510654875s
Physical plan:
ProjectionExec: expr=[98997497 as count(Int64(1))]
  PlaceholderRowExec

Running query...
Got 1 batches in  215.333µs
Query result:
+-----------------+
| count(Int64(1)) |
+-----------------+
| 98997497        |
+-----------------+

fix/concurrent_heads

Creating table / reading statistics....
Done in 2.187417ms
Logical plan:
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
  TableScan: ?table?
Creating physical plan...
Done in 686.674166ms
Physical plan:
ProjectionExec: expr=[98997497 as count(Int64(1))]
  PlaceholderRowExec

Running query...
Got 1 batches in  191.458µs
Query result:
+-----------------+
| count(Int64(1)) |
+-----------------+
| 98997497        |
+-----------------+

@geoffreyclaude geoffreyclaude deleted the fix/concurrent_heads branch March 4, 2025 15:03
@alamb
Copy link
Contributor

alamb commented Apr 1, 2025

This was referenced by @sergiimk in https://discord.com/channels/885562378132000778/1290751484807352412/1356393367566553240 (they hit the same problem and was pleased to find it fixed!)

sergiimk pushed a commit to kamu-data/datafusion that referenced this pull request Apr 5, 2025
…apache#14918)

* Fix sequential metadata fetching in ListingTable causing high latency

When scanning an exact list of remote Parquet files, the ListingTable was fetching file metadata (via head calls) sequentially. This was due to using `stream::iter(file_list).flatten()`, which processes each one-item stream in order. For remote blob stores, where each head call can take tens to hundreds of milliseconds, this sequential behavior significantly increased the time to create the physical plan.

This commit replaces the sequential flattening with concurrent merging using `stream::iter(file_list).flatten_unordered(meta_fetch_concurrency)`. With this change, the `head` requests are executed in parallel (up to the configured `meta_fetch_concurrency` limit), reducing latency when creating the physical plan.

Note that the ordering loss introduced by `flatten_unordered` is perfectly acceptable as the file list will anyways be fully sorted by path in `split_files` before being returned.

Additionally, tests have been updated to ensure that metadata fetching occurs concurrently.

* fix fmt

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
core Core DataFusion crate performance Make DataFusion faster
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Slow Physical Plan Creation for Remote Parquet Files
2 participants