Skip to content

Commit ec00112

Browse files
authored
Conditionally build page pruning predicates (apache#21480)
<!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. - Closes #. --> ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Page pruning predicates in the Parquet opener are constructed regardless of whether enable_page_index is set. Under high query load, this uses significant CPU time although these predicates are created and discarded quickly. ## Which issue does this PR close? ## What changes are included in this PR? This commit reorders the predicate creation flow to only construct page pruning predicates if enable_page_index is enabled. Regular predicates are created always as before. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? I am relying on unit tests but I can do manual testing with a debugger. <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? Changes are are optimization and are not user-facing. <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> <img width="1920" height="840" alt="image" src="https://github.com/user-attachments/assets/ae7797eb-2b44-4b2f-bf41-28e4e99f2386" />
1 parent d4e629f commit ec00112

File tree

2 files changed

+93
-32
lines changed

2 files changed

+93
-32
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 89 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -920,12 +920,22 @@ impl MetadataLoadedParquetOpen {
920920
prepared.physical_file_schema = Arc::clone(&physical_file_schema);
921921

922922
// Build predicates for this specific file
923-
let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
923+
let pruning_predicate = build_pruning_predicates(
924924
prepared.predicate.as_ref(),
925925
&physical_file_schema,
926926
&prepared.predicate_creation_errors,
927927
);
928928

929+
// Only build page pruning predicate if page index is enabled
930+
let page_pruning_predicate = if prepared.enable_page_index {
931+
prepared.predicate.as_ref().and_then(|predicate| {
932+
let p = build_page_pruning_predicate(predicate, &physical_file_schema);
933+
(p.filter_number() > 0).then_some(p)
934+
})
935+
} else {
936+
None
937+
};
938+
929939
Ok(FiltersPreparedParquetOpen {
930940
loaded: MetadataLoadedParquetOpen {
931941
prepared,
@@ -945,10 +955,7 @@ impl FiltersPreparedParquetOpen {
945955
// metadata load above may not have read the page index structures yet.
946956
// If we need them for reading and they aren't yet loaded, we need to
947957
// load them now.
948-
if should_enable_page_index(
949-
self.loaded.prepared.enable_page_index,
950-
&self.page_pruning_predicate,
951-
) {
958+
if self.page_pruning_predicate.is_some() {
952959
self.loaded.reader_metadata = load_page_index(
953960
self.loaded.reader_metadata,
954961
&mut self.loaded.prepared.async_file_reader,
@@ -1661,20 +1668,13 @@ pub(crate) fn build_pruning_predicates(
16611668
predicate: Option<&Arc<dyn PhysicalExpr>>,
16621669
file_schema: &SchemaRef,
16631670
predicate_creation_errors: &Count,
1664-
) -> (
1665-
Option<Arc<PruningPredicate>>,
1666-
Option<Arc<PagePruningAccessPlanFilter>>,
1667-
) {
1668-
let Some(predicate) = predicate.as_ref() else {
1669-
return (None, None);
1670-
};
1671-
let pruning_predicate = build_pruning_predicate(
1671+
) -> Option<Arc<PruningPredicate>> {
1672+
let predicate = predicate.as_ref()?;
1673+
build_pruning_predicate(
16721674
Arc::clone(predicate),
16731675
file_schema,
16741676
predicate_creation_errors,
1675-
);
1676-
let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema);
1677-
(pruning_predicate, Some(page_pruning_predicate))
1677+
)
16781678
}
16791679

16801680
/// Returns a `ArrowReaderMetadata` with the page index loaded, loading
@@ -1708,18 +1708,6 @@ async fn load_page_index<T: AsyncFileReader>(
17081708
}
17091709
}
17101710

1711-
fn should_enable_page_index(
1712-
enable_page_index: bool,
1713-
page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>,
1714-
) -> bool {
1715-
enable_page_index
1716-
&& page_pruning_predicate.is_some()
1717-
&& page_pruning_predicate
1718-
.as_ref()
1719-
.map(|p| p.filter_number() > 0)
1720-
.unwrap_or(false)
1721-
}
1722-
17231711
#[cfg(test)]
17241712
mod test {
17251713
use std::sync::Arc;
@@ -1849,6 +1837,12 @@ mod test {
18491837
self
18501838
}
18511839

1840+
/// Enable page index.
1841+
fn with_enable_page_index(mut self, enable: bool) -> Self {
1842+
self.enable_page_index = enable;
1843+
self
1844+
}
1845+
18521846
/// Set reverse row groups flag.
18531847
fn with_reverse_row_groups(mut self, enable: bool) -> Self {
18541848
self.reverse_row_groups = enable;
@@ -2713,4 +2707,71 @@ mod test {
27132707
"Reverse scan with non-contiguous row groups should correctly map RowSelection"
27142708
);
27152709
}
2710+
2711+
/// Test that page pruning predicates are only built and applied when `enable_page_index` is true.
2712+
///
2713+
/// The file has a single row group with 10 pages (10 rows each, values 1..100).
2714+
/// With page index enabled, pages whose max value <= 90 are pruned, returning only
2715+
/// the last page (rows 91..100). With page index disabled, all 100 rows are returned
2716+
/// since neither pushdown nor row-group pruning is active.
2717+
#[tokio::test]
2718+
async fn test_page_pruning_predicate_respects_enable_page_index() {
2719+
use parquet::file::properties::WriterProperties;
2720+
2721+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
2722+
2723+
// 100 rows with values 1..=100, written as a single row group with 10 rows per page
2724+
let values: Vec<i32> = (1..=100).collect();
2725+
let batch = record_batch!((
2726+
"a",
2727+
Int32,
2728+
values.iter().map(|v| Some(*v)).collect::<Vec<_>>()
2729+
))
2730+
.unwrap();
2731+
let props = WriterProperties::builder()
2732+
.set_data_page_row_count_limit(10)
2733+
.set_write_batch_size(10)
2734+
.build();
2735+
let schema = batch.schema();
2736+
let data_size = write_parquet_batches(
2737+
Arc::clone(&store),
2738+
"test.parquet",
2739+
vec![batch],
2740+
Some(props),
2741+
)
2742+
.await;
2743+
2744+
let file = PartitionedFile::new("test.parquet".to_string(), data_size as u64);
2745+
2746+
// predicate: a > 90 — should allow page index to prune first 9 pages
2747+
let predicate = logical2physical(&col("a").gt(lit(90i32)), &schema);
2748+
2749+
let make_opener = |enable_page_index| {
2750+
ParquetOpenerBuilder::new()
2751+
.with_store(Arc::clone(&store))
2752+
.with_schema(Arc::clone(&schema))
2753+
.with_predicate(Arc::clone(&predicate))
2754+
.with_enable_page_index(enable_page_index)
2755+
// disable pushdown and row-group pruning so the only pruning path is page index
2756+
.with_pushdown_filters(false)
2757+
.with_row_group_stats_pruning(false)
2758+
.build()
2759+
};
2760+
let (_, rows_with_page_index) = count_batches_and_rows(
2761+
make_opener(true).open(file.clone()).unwrap().await.unwrap(),
2762+
)
2763+
.await;
2764+
let (_, rows_without_page_index) =
2765+
count_batches_and_rows(make_opener(false).open(file).unwrap().await.unwrap())
2766+
.await;
2767+
2768+
assert_eq!(
2769+
rows_with_page_index, 10,
2770+
"page index should prune 9 of 10 pages"
2771+
);
2772+
assert_eq!(
2773+
rows_without_page_index, 100,
2774+
"without page index all rows are returned"
2775+
);
2776+
}
27162777
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -629,17 +629,17 @@ impl FileSource for ParquetSource {
629629
write!(f, ", reverse_row_groups=true")?;
630630
}
631631

632-
// Try to build a the pruning predicates.
632+
// Try to build the pruning predicates.
633633
// These are only generated here because it's useful to have *some*
634634
// idea of what pushdown is happening when viewing plans.
635-
// However it is important to note that these predicates are *not*
635+
// However, it is important to note that these predicates are *not*
636636
// necessarily the predicates that are actually evaluated:
637637
// the actual predicates are built in reference to the physical schema of
638638
// each file, which we do not have at this point and hence cannot use.
639-
// Instead we use the logical schema of the file (the table schema without partition columns).
639+
// Instead, we use the logical schema of the file (the table schema without partition columns).
640640
if let Some(predicate) = &self.predicate {
641641
let predicate_creation_errors = Count::new();
642-
if let (Some(pruning_predicate), _) = build_pruning_predicates(
642+
if let Some(pruning_predicate) = build_pruning_predicates(
643643
Some(predicate),
644644
self.table_schema.table_schema(),
645645
&predicate_creation_errors,

0 commit comments

Comments
 (0)