-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Support eager loading page index parquet metadata #18112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @zhuqi-lucas
Given the policy for when to load the page index is a tradeoff between IO and memory/latency (aka eagerly loading the page index may cause more IO but lower latency) I think a tuning knob is appropriate
However I am not quite sure what effect this change has on the number of object store requests
Using the tooling that @BlakeOrth have been working on for instrumenting datafusion-cli
we can see what requests are actually being made
Here is the trace for main
DataFusion CLI v50.2.0
> \object_store_profiling trace
ObjectStore Profile mode set to Trace
> SELECT COUNT(*) from 'https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet' where "SearchPhrase" <> '';
+----------+
| count(*) |
+----------+
| 131559 |
+----------+
1 row(s) fetched.
Elapsed 0.606 seconds.
Object Store Profiling
Instrumented Object Store: instrument_mode: Trace, inner: HttpStore
2025-10-17T08:55:22.202041+00:00 operation=Get duration=0.025994s size=8 range: bytes=174965036-174965043 path=hits_compatible/athena_partitioned/hits_1.parquet
2025-10-17T08:55:22.228064+00:00 operation=Get duration=0.028127s size=34322 range: bytes=174930714-174965035 path=hits_compatible/athena_partitioned/hits_1.parquet
2025-10-17T08:55:22.295696+00:00 operation=Get duration=0.032303s size=15503 range: bytes=5120273-5135775 path=hits_compatible/athena_partitioned/hits_1.parquet
2025-10-17T08:55:22.296663+00:00 operation=Get duration=0.060797s size=3895852 range: bytes=145483536-149379387 path=hits_compatible/athena_partitioned/hits_1.parquet
2025-10-17T08:55:22.330266+00:00 operation=Get duration=0.040970s size=61815 range: bytes=46392516-46454330 path=hits_compatible/athena_partitioned/hits_1.parquet
Here is the trace for this PR:
ObjectStore Profile mode set to Trace
> set datafusion.execution.parquet.eager_load_page_index = true;
0 row(s) fetched.
Elapsed 0.011 seconds.
Object Store Profiling
> SELECT COUNT(*) from 'https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet' where "SearchPhrase" <> '';
+----------+
| count(*) |
+----------+
| 131559 |
+----------+
1 row(s) fetched.
Elapsed 0.776 seconds.
Object Store Profiling
Instrumented Object Store: instrument_mode: Trace, inner: HttpStore
2025-10-17T09:00:12.866861+00:00 operation=Get duration=0.027121s size=8 range: bytes=174965036-174965043 path=hits_compatible/athena_partitioned/hits_1.parquet
2025-10-17T09:00:12.894311+00:00 operation=Get duration=0.030500s size=34322 range: bytes=174930714-174965035 path=hits_compatible/athena_partitioned/hits_1.parquet
2025-10-17T09:00:12.997688+00:00 operation=Get duration=0.027598s size=3895852 range: bytes=145483536-149379387 path=hits_compatible/athena_partitioned/hits_1.parquet
2025-10-17T09:00:12.997693+00:00 operation=Get duration=0.080185s size=15503 range: bytes=5120273-5135775 path=hits_compatible/athena_partitioned/hits_1.parquet
2025-10-17T09:00:13.095884+00:00 operation=Get duration=0.039606s size=61815 range: bytes=46392516-46454330 path=hits_compatible/athena_partitioned/hits_1.parquet
Summaries:
Get
count: 5
duration min: 0.027121s
duration max: 0.080185s
duration avg: 0.041002s
size min: 8 B
size max: 3895852 B
size avg: 801500 B
size sum: 4007500 B
You can see in both cases that 5 (!!) requests are made.
Actually, I forgot that https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet does not have page indexes. I rewrote this file so that it does: copy 'hits_1.parquet' to 'hits_1_index.parquet'; I was having trouble getting the trace thing to work |
![]() |
It seems always request 2 times to s3 at lease if we enable page index, i will investigate more: /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
/// given a [`MetadataFetch`].
///
/// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
/// performed by this function.
#[cfg(all(feature = "async", feature = "arrow"))]
pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
self.metadata = Some(metadata);
// we can return if page indexes aren't requested
if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
{
return Ok(());
}
self.load_page_index_with_remainder(fetch, remainder).await
} I am not sure if with_prefetch_hint can make the eager loading page index parquet metadata to happen in one request... From the description here, it seems will help for page index: /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`].
/// Only used for the asynchronous [`Self::try_load()`] method.
///
/// By default, the reader will first fetch the last 8 bytes of the input file to obtain the
/// size of the footer metadata. A second fetch will be performed to obtain the needed bytes.
/// After parsing the footer metadata, a third fetch will be performed to obtain the bytes
/// needed to decode the page index structures, if they have been requested. To avoid
/// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed
/// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and
/// reduce latency. Setting `prefetch` too small will not trigger an error, but will result
/// in extra fetches being performed.
pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
self.prefetch_hint = prefetch;
self
} |
More details, it seems datafusion default will not add metadata_size_hint, we can try to add and testing it. /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
/// bytes of the parquet file optimistically. If not specified, two reads are required:
/// One read to fetch the 8-byte parquet footer and
/// another to fetch the metadata length encoded in the footer
pub metadata_size_hint: Option<usize>, default = None |
I filed a potentially related ticket here: |
Got it, thank you @alamb! |
From what I can tell, to read ParquetMetadata with the default configuration of LIstingTable DataFusion will issue 3 object store requests:
The first 8 byte request could be avoiding by changing the default prefetch_hint aka #18118 I think you could potentially avoiding the third request if you extended the prefetch_hint code to use the page index if it was fetched in the initial request So the flow would be DataFusion makes an initial request for the last The newly added Push metadata decoder likely makes this easier to implement: (as it will tell you what ranges are needed) |
I agree today that is not how the code works. However, I think it is technically possible and likely not super hard to implement. -- not sure if it is best done in DataFusion or arrow-rs |
I taken the task #18118, at least we can reduce one.
Cool, i will look into this. |
Great, this will be a further improvement i agree, i will look into! |
This PR is still needed, because currently we split two part of loading metadata if we enable page index, and the second one loading page index will not use reminder, so even we setting prefetch_hint now, we can't reduce the page index request without this PR.
/// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
/// been obtained. See [`Self::new_with_metadata()`].
#[cfg(all(feature = "async", feature = "arrow"))]
pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
self.load_page_index_with_remainder(fetch, None).await
} |
As I understand it, your goal is to reduce the number of object store requests when loading parquet metadata (which is a good goal 👍 ) However, I am not sure this PR achieve this goal-- instead what i think this PR does is change when the requests are made to object store to a bit earlier in the processing pipeline, but not the actual number of them. I think the best way to ensure we minimize the object store requests is:
If prefetch fetches enough bytes, this strategy will result in a single object store requests to read all required metadata I realize setting prefetch today may not actually also parse the page index, but I think that is what we should be working towards (rather than adding another flag, unless there is some need I am missing) i personally suggest starting with an end to end type test (perhaps in https://github.com/apache/datafusion/blob/main/datafusion/core/tests/parquet_config.rs) that illustrates what is happening:
Then we can configure various prefetch settings and ensure that only the expected number of requests are made |
Thank you @alamb, this is good suggestion. I will try to do this! What i was meaning is current datafusion will not apply prefetch_hint for page index, because we split the two loading for datafusion, and the second will not pass any prefetch_hint, so the first loading result will not apply to it..., here is the code:
It pass the None to load_page_index_with_remainder, so the hint will not be used for the second loading. /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
/// been obtained. See [`Self::new_with_metadata()`].
#[cfg(all(feature = "async", feature = "arrow"))]
pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
self.load_page_index_with_remainder(fetch, None).await
} Details code, if we don't pass reminder, we will not use already fetched data: #[cfg(all(feature = "async", feature = "arrow"))]
async fn load_page_index_with_remainder<F: MetadataFetch>(
&mut self,
mut fetch: F,
remainder: Option<(usize, Bytes)>,
) -> Result<()> {
if self.metadata.is_none() {
return Err(general_err!("Footer metadata is not present"));
}
// Get bounds needed for page indexes (if any are present in the file).
let range = self.range_for_page_index();
let range = match range {
Some(range) => range,
None => return Ok(()),
};
let bytes = match &remainder {
Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
let remainder_start = *remainder_start as u64;
let offset = usize::try_from(range.start - remainder_start)?;
let end = usize::try_from(range.end - remainder_start)?;
assert!(end <= remainder.len());
remainder.slice(offset..end)
}
// Note: this will potentially fetch data already in remainder, this keeps things simple
_ => fetch.fetch(range.start..range.end).await?,
};
// Sanity check
assert_eq!(bytes.len() as u64, range.end - range.start);
self.parse_column_index(&bytes, range.start)?;
self.parse_offset_index(&bytes, range.start)?;
Ok(())
} And for the first loading, we will pass the right reminder, which can use prefetch_hint: // Begin by loading the metadata from the underlying reader (note
// the returned metadata may actually include page indexes as some
// readers may return page indexes even when not requested -- for
// example when they are cached)
let mut reader_metadata =
ArrowReaderMetadata::load_async(&mut async_file_reader, options.clone())
.await?; /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
/// given a [`MetadataFetch`].
///
/// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
/// performed by this function.
#[cfg(all(feature = "async", feature = "arrow"))]
pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
self.metadata = Some(metadata);
// we can return if page indexes aren't requested
if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
{
return Ok(());
}
self.load_page_index_with_remainder(fetch, remainder).await
} Correct me if am wrong. I am not sure if i am missing something. Thanks! |
Which issue does this PR close?
Rationale for this change
Currently loading metadata will split to two parts, if we enable page index. It will cause heavy load for s3 request when we store parquet file in s3 and without local cache.
This PR add support to eager loading page index parquet metadata, combined two loading to one which is the original datafusion implementation.
What changes are included in this PR?
Currently loading metadata will split to two parts, if we enable page index. It will cause heavy load for s3 request when we store parquet file in s3 and without local cache.
This PR add support to eager loading page index parquet metadata, combined two loading to one which is the original datafusion implementation.
Are these changes tested?
Yes
Are there any user-facing changes?
No