Skip to content
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,11 @@ config_namespace! {
/// rows decoded.
pub enable_page_index: bool, default = true

/// (reading) If true, eagerly loads the Parquet data page level metadata (the
/// Page Index), this is mostly used by object stores where multiple loading
/// I/O calls are expensive.
pub eager_load_page_index: bool, default = false

/// (reading) If true, the parquet reader attempts to skip entire row groups based
/// on the predicate in the query and the metadata (min/max values) stored in
/// the parquet file
Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ impl ParquetOptions {

// not in WriterProperties
enable_page_index: _,
eager_load_page_index: _,
pruning: _,
skip_metadata: _,
metadata_size_hint: _,
Expand Down Expand Up @@ -460,6 +461,7 @@ mod tests {

// not in WriterProperties, but itemizing here to not skip newly added props
enable_page_index: defaults.enable_page_index,
eager_load_page_index: defaults.eager_load_page_index,
pruning: defaults.pruning,
skip_metadata: defaults.skip_metadata,
metadata_size_hint: defaults.metadata_size_hint,
Expand Down Expand Up @@ -570,6 +572,7 @@ mod tests {

// not in WriterProperties
enable_page_index: global_options_defaults.enable_page_index,
eager_load_page_index: global_options_defaults.eager_load_page_index,
pruning: global_options_defaults.pruning,
skip_metadata: global_options_defaults.skip_metadata,
metadata_size_hint: global_options_defaults.metadata_size_hint,
Expand Down
81 changes: 79 additions & 2 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ pub(super) struct ParquetOpener {
/// Should the page index be read from parquet files, if present, to skip
/// data pages
pub enable_page_index: bool,
/// Should the page index be eagerly loaded when reading parquet metadata.
/// If false, the page index will be loaded later which means we will have 2 loading
/// phases instead of 1 when the page index is needed.
pub eager_load_page_index: bool,
/// Should the bloom filter be read from parquet, if present, to skip row
/// groups
pub enable_bloom_filter: bool,
Expand Down Expand Up @@ -157,6 +161,7 @@ impl FileOpener for ParquetOpener {
let mut predicate_file_schema = Arc::clone(&self.logical_file_schema);

let enable_page_index = self.enable_page_index;
let eager_load_page_index = self.eager_load_page_index;
#[cfg(feature = "parquet_encryption")]
let encryption_context = self.get_encryption_context();
let max_predicate_cache_size = self.max_predicate_cache_size;
Expand Down Expand Up @@ -205,7 +210,10 @@ impl FileOpener for ParquetOpener {
// unnecessary I/O. We decide later if it is needed to evaluate the
// pruning predicates. Thus default to not requesting if from the
// underlying reader.
let mut options = ArrowReaderOptions::new().with_page_index(false);
// But if eager loading is requested, we do request it now.
let request_page_index = eager_load_page_index && enable_page_index;
let mut options =
ArrowReaderOptions::new().with_page_index(request_page_index);
#[cfg(feature = "parquet_encryption")]
if let Some(fd_val) = file_decryption_properties {
options = options.with_file_decryption_properties((*fd_val).clone());
Expand Down Expand Up @@ -291,10 +299,12 @@ impl FileOpener for ParquetOpener {
&predicate_creation_errors,
);

let needs_lazy_load = !eager_load_page_index
&& should_enable_page_index(enable_page_index, &page_pruning_predicate);
// The page index is not stored inline in the parquet footer so the
// code above may not have read the page index structures yet. If we
// need them for reading and they aren't yet loaded, we need to load them now.
if should_enable_page_index(enable_page_index, &page_pruning_predicate) {
if needs_lazy_load {
reader_metadata = load_page_index(
reader_metadata,
&mut async_file_reader,
Expand Down Expand Up @@ -861,6 +871,7 @@ mod test {
pushdown_filters: false, // note that this is false!
reorder_filters: false,
enable_page_index: false,
eager_load_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
Expand Down Expand Up @@ -934,6 +945,7 @@ mod test {
pushdown_filters: false, // note that this is false!
reorder_filters: false,
enable_page_index: false,
eager_load_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
Expand Down Expand Up @@ -1023,6 +1035,7 @@ mod test {
pushdown_filters: false, // note that this is false!
reorder_filters: false,
enable_page_index: false,
eager_load_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
Expand Down Expand Up @@ -1115,6 +1128,7 @@ mod test {
pushdown_filters: true, // note that this is true!
reorder_filters: true,
enable_page_index: false,
eager_load_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: false, // note that this is false!
Expand Down Expand Up @@ -1207,6 +1221,7 @@ mod test {
pushdown_filters: false, // note that this is false!
reorder_filters: false,
enable_page_index: false,
eager_load_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
Expand Down Expand Up @@ -1357,6 +1372,7 @@ mod test {
pushdown_filters: true,
reorder_filters: false,
enable_page_index: false,
eager_load_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory),
enable_row_group_stats_pruning: false,
Expand Down Expand Up @@ -1387,4 +1403,65 @@ mod test {
assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0);
assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 2);
}

#[test]
fn test_should_enable_page_index_and_eager_lazy_logic() {
use arrow::datatypes::{Field, Schema};
use datafusion_common::ScalarValue;
use datafusion_expr::col;
use datafusion_expr::lit;
use datafusion_physical_expr::planner::logical2physical;
use std::sync::Arc;

// 1) Construct a minimal file schema (only needed to produce a predicate)
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));

// Build a simple predicate: a = 1
let expr = col("a").eq(lit(ScalarValue::Int32(Some(1))));
let predicate = logical2physical(&expr, &schema);

// 2) Test that build_page_pruning_predicate -> page pruning predicate is not empty
let page_pruning_pred =
crate::opener::build_page_pruning_predicate(&predicate, &schema);
// page_pruning_pred.filter_number() should be greater than 0 (i.e. at least one page-level filter)
assert!(page_pruning_pred.filter_number() > 0);

// 3) Test basic behavior of should_enable_page_index
// enable_page_index = true and page_pruning_pred present -> returns true
assert!(crate::opener::should_enable_page_index(
true,
&Some(Arc::clone(&page_pruning_pred))
));
// enable_page_index = false -> always false
assert!(!crate::opener::should_enable_page_index(
false,
&Some(Arc::clone(&page_pruning_pred))
));
// no page_pruning_pred -> always false
assert!(!crate::opener::should_enable_page_index(true, &None));

// 4) Test eager vs lazy decision logic (corresponds to logic in ParquetOpener)
let enable_page_index = true;
// eager = true => request_page_index = true; needs_lazy_load = false
let eager_load_page_index = true;
let request_page_index = eager_load_page_index && enable_page_index;
assert!(request_page_index);
let needs_lazy_load = !eager_load_page_index
&& crate::opener::should_enable_page_index(
enable_page_index,
&Some(Arc::clone(&page_pruning_pred)),
);
assert!(!needs_lazy_load);

// eager = false => request_page_index = false; needs_lazy_load should be true
let eager_load_page_index = false;
let request_page_index = eager_load_page_index && enable_page_index;
assert!(!request_page_index);
let needs_lazy_load = !eager_load_page_index
&& crate::opener::should_enable_page_index(
enable_page_index,
&Some(Arc::clone(&page_pruning_pred)),
);
assert!(needs_lazy_load);
}
}
5 changes: 5 additions & 0 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,10 @@ impl ParquetSource {
self.table_parquet_options.global.enable_page_index
}

fn enable_eager_load_page_index(&self) -> bool {
self.table_parquet_options.global.eager_load_page_index
}

/// If enabled, the reader will read by the bloom filter
pub fn with_bloom_filter_on_read(mut self, bloom_filter_on_read: bool) -> Self {
self.table_parquet_options.global.bloom_filter_on_read = bloom_filter_on_read;
Expand Down Expand Up @@ -574,6 +578,7 @@ impl FileSource for ParquetSource {
pushdown_filters: self.pushdown_filters(),
reorder_filters: self.reorder_filters(),
enable_page_index: self.enable_page_index(),
eager_load_page_index: self.enable_eager_load_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
schema_adapter_factory,
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ message ParquetColumnOptions {
message ParquetOptions {
// Regular fields
bool enable_page_index = 1; // default = true
bool eager_load_page_index = 34; // default = false
bool pruning = 2; // default = true
bool skip_metadata = 3; // default = true
bool pushdown_filters = 5; // default = false
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
#[allow(deprecated)] // max_statistics_size
Ok(ParquetOptions {
enable_page_index: value.enable_page_index,
eager_load_page_index: value.eager_load_page_index,
pruning: value.pruning,
skip_metadata: value.skip_metadata,
metadata_size_hint: value
Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto-common/src/generated/pbjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5545,6 +5545,9 @@ impl serde::Serialize for ParquetOptions {
if self.enable_page_index {
len += 1;
}
if self.eager_load_page_index {
len += 1;
}
if self.pruning {
len += 1;
}
Expand Down Expand Up @@ -5639,6 +5642,9 @@ impl serde::Serialize for ParquetOptions {
if self.enable_page_index {
struct_ser.serialize_field("enablePageIndex", &self.enable_page_index)?;
}
if self.eager_load_page_index {
struct_ser.serialize_field("eagerLoadPageIndex", &self.eager_load_page_index)?;
}
if self.pruning {
struct_ser.serialize_field("pruning", &self.pruning)?;
}
Expand Down Expand Up @@ -5809,6 +5815,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
const FIELDS: &[&str] = &[
"enable_page_index",
"enablePageIndex",
"eager_load_page_index",
"eagerLoadPageIndex",
"pruning",
"skip_metadata",
"skipMetadata",
Expand Down Expand Up @@ -5871,6 +5879,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
#[allow(clippy::enum_variant_names)]
enum GeneratedField {
EnablePageIndex,
EagerLoadPageIndex,
Pruning,
SkipMetadata,
PushdownFilters,
Expand Down Expand Up @@ -5923,6 +5932,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
{
match value {
"enablePageIndex" | "enable_page_index" => Ok(GeneratedField::EnablePageIndex),
"eagerLoadPageIndex" | "eager_load_page_index" => Ok(GeneratedField::EagerLoadPageIndex),
"pruning" => Ok(GeneratedField::Pruning),
"skipMetadata" | "skip_metadata" => Ok(GeneratedField::SkipMetadata),
"pushdownFilters" | "pushdown_filters" => Ok(GeneratedField::PushdownFilters),
Expand Down Expand Up @@ -5973,6 +5983,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
V: serde::de::MapAccess<'de>,
{
let mut enable_page_index__ = None;
let mut eager_load_page_index__ = None;
let mut pruning__ = None;
let mut skip_metadata__ = None;
let mut pushdown_filters__ = None;
Expand Down Expand Up @@ -6011,6 +6022,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
}
enable_page_index__ = Some(map_.next_value()?);
}
GeneratedField::EagerLoadPageIndex => {
if eager_load_page_index__.is_some() {
return Err(serde::de::Error::duplicate_field("eagerLoadPageIndex"));
}
eager_load_page_index__ = Some(map_.next_value()?);
}
GeneratedField::Pruning => {
if pruning__.is_some() {
return Err(serde::de::Error::duplicate_field("pruning"));
Expand Down Expand Up @@ -6209,6 +6226,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
}
Ok(ParquetOptions {
enable_page_index: enable_page_index__.unwrap_or_default(),
eager_load_page_index: eager_load_page_index__.unwrap_or_default(),
pruning: pruning__.unwrap_or_default(),
skip_metadata: skip_metadata__.unwrap_or_default(),
pushdown_filters: pushdown_filters__.unwrap_or_default(),
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto-common/src/generated/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,9 @@ pub struct ParquetOptions {
/// default = true
#[prost(bool, tag = "1")]
pub enable_page_index: bool,
/// default = false
#[prost(bool, tag = "34")]
pub eager_load_page_index: bool,
/// default = true
#[prost(bool, tag = "2")]
pub pruning: bool,
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/to_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
fn try_from(value: &ParquetOptions) -> datafusion_common::Result<Self, Self::Error> {
Ok(protobuf::ParquetOptions {
enable_page_index: value.enable_page_index,
eager_load_page_index: value.eager_load_page_index,
pruning: value.pruning,
skip_metadata: value.skip_metadata,
metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/datafusion_proto_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,9 @@ pub struct ParquetOptions {
/// default = true
#[prost(bool, tag = "1")]
pub enable_page_index: bool,
/// default = false
#[prost(bool, tag = "34")]
pub eager_load_page_index: bool,
/// default = true
#[prost(bool, tag = "2")]
pub pruning: bool,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto/src/logical_plan/file_formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ mod parquet {
TableParquetOptionsProto {
global: Some(ParquetOptionsProto {
enable_page_index: global_options.global.enable_page_index,
eager_load_page_index: global_options.global.eager_load_page_index,
pruning: global_options.global.pruning,
skip_metadata: global_options.global.skip_metadata,
metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
Expand Down Expand Up @@ -468,6 +469,7 @@ mod parquet {
#[allow(deprecated)] // max_statistics_size
ParquetOptions {
enable_page_index: proto.enable_page_index,
eager_load_page_index: proto.eager_load_page_index,
pruning: proto.pruning,
skip_metadata: proto.skip_metadata,
metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
Expand Down
Loading