Skip to content
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,11 @@ config_namespace! {
/// rows decoded.
pub enable_page_index: bool, default = true

/// (reading) If true, eagerly loads the Parquet data page level metadata (the
/// Page Index), this is mostly used by object stores where multiple loading
/// I/O calls are expensive.
pub eager_load_page_index: bool, default = false

/// (reading) If true, the parquet reader attempts to skip entire row groups based
/// on the predicate in the query and the metadata (min/max values) stored in
/// the parquet file
Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ impl ParquetOptions {

// not in WriterProperties
enable_page_index: _,
eager_load_page_index: _,
pruning: _,
skip_metadata: _,
metadata_size_hint: _,
Expand Down Expand Up @@ -460,6 +461,7 @@ mod tests {

// not in WriterProperties, but itemizing here to not skip newly added props
enable_page_index: defaults.enable_page_index,
eager_load_page_index: defaults.eager_load_page_index,
pruning: defaults.pruning,
skip_metadata: defaults.skip_metadata,
metadata_size_hint: defaults.metadata_size_hint,
Expand Down Expand Up @@ -570,6 +572,7 @@ mod tests {

// not in WriterProperties
enable_page_index: global_options_defaults.enable_page_index,
eager_load_page_index: global_options_defaults.eager_load_page_index,
pruning: global_options_defaults.pruning,
skip_metadata: global_options_defaults.skip_metadata,
metadata_size_hint: global_options_defaults.metadata_size_hint,
Expand Down
81 changes: 79 additions & 2 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ pub(super) struct ParquetOpener {
/// Should the page index be read from parquet files, if present, to skip
/// data pages
pub enable_page_index: bool,
/// Should the page index be eagerly loaded when reading parquet metadata.
/// If false, the page index will be loaded later which means we will have 2 loading
/// phases instead of 1 when the page index is needed.
pub eager_load_page_index: bool,
/// Should the bloom filter be read from parquet, if present, to skip row
/// groups
pub enable_bloom_filter: bool,
Expand Down Expand Up @@ -157,6 +161,7 @@ impl FileOpener for ParquetOpener {
let mut predicate_file_schema = Arc::clone(&self.logical_file_schema);

let enable_page_index = self.enable_page_index;
let eager_load_page_index = self.eager_load_page_index;
#[cfg(feature = "parquet_encryption")]
let encryption_context = self.get_encryption_context();
let max_predicate_cache_size = self.max_predicate_cache_size;
Expand Down Expand Up @@ -205,7 +210,10 @@ impl FileOpener for ParquetOpener {
// unnecessary I/O. We decide later if it is needed to evaluate the
// pruning predicates. Thus default to not requesting if from the
// underlying reader.
let mut options = ArrowReaderOptions::new().with_page_index(false);
// But if eager loading is requested, we do request it now.
let request_page_index = eager_load_page_index && enable_page_index;
let mut options =
ArrowReaderOptions::new().with_page_index(request_page_index);
#[cfg(feature = "parquet_encryption")]
if let Some(fd_val) = file_decryption_properties {
options = options.with_file_decryption_properties((*fd_val).clone());
Expand Down Expand Up @@ -291,10 +299,12 @@ impl FileOpener for ParquetOpener {
&predicate_creation_errors,
);

let needs_lazy_load = !eager_load_page_index
&& should_enable_page_index(enable_page_index, &page_pruning_predicate);
// The page index is not stored inline in the parquet footer so the
// code above may not have read the page index structures yet. If we
// need them for reading and they aren't yet loaded, we need to load them now.
if should_enable_page_index(enable_page_index, &page_pruning_predicate) {
if needs_lazy_load {
reader_metadata = load_page_index(
reader_metadata,
&mut async_file_reader,
Expand Down Expand Up @@ -861,6 +871,7 @@ mod test {
pushdown_filters: false, // note that this is false!
reorder_filters: false,
enable_page_index: false,
eager_load_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
Expand Down Expand Up @@ -934,6 +945,7 @@ mod test {
pushdown_filters: false, // note that this is false!
reorder_filters: false,
enable_page_index: false,
eager_load_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
Expand Down Expand Up @@ -1023,6 +1035,7 @@ mod test {
pushdown_filters: false, // note that this is false!
reorder_filters: false,
enable_page_index: false,
eager_load_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
Expand Down Expand Up @@ -1115,6 +1128,7 @@ mod test {
pushdown_filters: true, // note that this is true!
reorder_filters: true,
enable_page_index: false,
eager_load_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: false, // note that this is false!
Expand Down Expand Up @@ -1207,6 +1221,7 @@ mod test {
pushdown_filters: false, // note that this is false!
reorder_filters: false,
enable_page_index: false,
eager_load_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
Expand Down Expand Up @@ -1357,6 +1372,7 @@ mod test {
pushdown_filters: true,
reorder_filters: false,
enable_page_index: false,
eager_load_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory),
enable_row_group_stats_pruning: false,
Expand Down Expand Up @@ -1387,4 +1403,65 @@ mod test {
assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0);
assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 2);
}

#[test]
fn test_should_enable_page_index_and_eager_lazy_logic() {
use arrow::datatypes::{Field, Schema};
use datafusion_common::ScalarValue;
use datafusion_expr::col;
use datafusion_expr::lit;
use datafusion_physical_expr::planner::logical2physical;
use std::sync::Arc;

// 1) Construct a minimal file schema (only needed to produce a predicate)
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));

// Build a simple predicate: a = 1
let expr = col("a").eq(lit(ScalarValue::Int32(Some(1))));
let predicate = logical2physical(&expr, &schema);

// 2) Test that build_page_pruning_predicate -> page pruning predicate is not empty
let page_pruning_pred =
crate::opener::build_page_pruning_predicate(&predicate, &schema);
// page_pruning_pred.filter_number() should be greater than 0 (i.e. at least one page-level filter)
assert!(page_pruning_pred.filter_number() > 0);

// 3) Test basic behavior of should_enable_page_index
// enable_page_index = true and page_pruning_pred present -> returns true
assert!(crate::opener::should_enable_page_index(
true,
&Some(Arc::clone(&page_pruning_pred))
));
// enable_page_index = false -> always false
assert!(!crate::opener::should_enable_page_index(
false,
&Some(Arc::clone(&page_pruning_pred))
));
// no page_pruning_pred -> always false
assert!(!crate::opener::should_enable_page_index(true, &None));

// 4) Test eager vs lazy decision logic (corresponds to logic in ParquetOpener)
let enable_page_index = true;
// eager = true => request_page_index = true; needs_lazy_load = false
let eager_load_page_index = true;
let request_page_index = eager_load_page_index && enable_page_index;
assert!(request_page_index);
let needs_lazy_load = !eager_load_page_index
&& crate::opener::should_enable_page_index(
enable_page_index,
&Some(Arc::clone(&page_pruning_pred)),
);
assert!(!needs_lazy_load);

// eager = false => request_page_index = false; needs_lazy_load should be true
let eager_load_page_index = false;
let request_page_index = eager_load_page_index && enable_page_index;
assert!(!request_page_index);
let needs_lazy_load = !eager_load_page_index
&& crate::opener::should_enable_page_index(
enable_page_index,
&Some(Arc::clone(&page_pruning_pred)),
);
assert!(needs_lazy_load);
}
}
5 changes: 5 additions & 0 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,10 @@ impl ParquetSource {
self.table_parquet_options.global.enable_page_index
}

fn enable_eager_load_page_index(&self) -> bool {
self.table_parquet_options.global.eager_load_page_index
}

/// If enabled, the reader will read by the bloom filter
pub fn with_bloom_filter_on_read(mut self, bloom_filter_on_read: bool) -> Self {
self.table_parquet_options.global.bloom_filter_on_read = bloom_filter_on_read;
Expand Down Expand Up @@ -574,6 +578,7 @@ impl FileSource for ParquetSource {
pushdown_filters: self.pushdown_filters(),
reorder_filters: self.reorder_filters(),
enable_page_index: self.enable_page_index(),
eager_load_page_index: self.enable_eager_load_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
schema_adapter_factory,
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ message ParquetColumnOptions {
message ParquetOptions {
// Regular fields
bool enable_page_index = 1; // default = true
bool eager_load_page_index = 34; // default = false
bool pruning = 2; // default = true
bool skip_metadata = 3; // default = true
bool pushdown_filters = 5; // default = false
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
#[allow(deprecated)] // max_statistics_size
Ok(ParquetOptions {
enable_page_index: value.enable_page_index,
eager_load_page_index: value.eager_load_page_index,
pruning: value.pruning,
skip_metadata: value.skip_metadata,
metadata_size_hint: value
Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto-common/src/generated/pbjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5545,6 +5545,9 @@ impl serde::Serialize for ParquetOptions {
if self.enable_page_index {
len += 1;
}
if self.eager_load_page_index {
len += 1;
}
if self.pruning {
len += 1;
}
Expand Down Expand Up @@ -5639,6 +5642,9 @@ impl serde::Serialize for ParquetOptions {
if self.enable_page_index {
struct_ser.serialize_field("enablePageIndex", &self.enable_page_index)?;
}
if self.eager_load_page_index {
struct_ser.serialize_field("eagerLoadPageIndex", &self.eager_load_page_index)?;
}
if self.pruning {
struct_ser.serialize_field("pruning", &self.pruning)?;
}
Expand Down Expand Up @@ -5809,6 +5815,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
const FIELDS: &[&str] = &[
"enable_page_index",
"enablePageIndex",
"eager_load_page_index",
"eagerLoadPageIndex",
"pruning",
"skip_metadata",
"skipMetadata",
Expand Down Expand Up @@ -5871,6 +5879,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
#[allow(clippy::enum_variant_names)]
enum GeneratedField {
EnablePageIndex,
EagerLoadPageIndex,
Pruning,
SkipMetadata,
PushdownFilters,
Expand Down Expand Up @@ -5923,6 +5932,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
{
match value {
"enablePageIndex" | "enable_page_index" => Ok(GeneratedField::EnablePageIndex),
"eagerLoadPageIndex" | "eager_load_page_index" => Ok(GeneratedField::EagerLoadPageIndex),
"pruning" => Ok(GeneratedField::Pruning),
"skipMetadata" | "skip_metadata" => Ok(GeneratedField::SkipMetadata),
"pushdownFilters" | "pushdown_filters" => Ok(GeneratedField::PushdownFilters),
Expand Down Expand Up @@ -5973,6 +5983,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
V: serde::de::MapAccess<'de>,
{
let mut enable_page_index__ = None;
let mut eager_load_page_index__ = None;
let mut pruning__ = None;
let mut skip_metadata__ = None;
let mut pushdown_filters__ = None;
Expand Down Expand Up @@ -6011,6 +6022,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
}
enable_page_index__ = Some(map_.next_value()?);
}
GeneratedField::EagerLoadPageIndex => {
if eager_load_page_index__.is_some() {
return Err(serde::de::Error::duplicate_field("eagerLoadPageIndex"));
}
eager_load_page_index__ = Some(map_.next_value()?);
}
GeneratedField::Pruning => {
if pruning__.is_some() {
return Err(serde::de::Error::duplicate_field("pruning"));
Expand Down Expand Up @@ -6209,6 +6226,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
}
Ok(ParquetOptions {
enable_page_index: enable_page_index__.unwrap_or_default(),
eager_load_page_index: eager_load_page_index__.unwrap_or_default(),
pruning: pruning__.unwrap_or_default(),
skip_metadata: skip_metadata__.unwrap_or_default(),
pushdown_filters: pushdown_filters__.unwrap_or_default(),
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto-common/src/generated/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,9 @@ pub struct ParquetOptions {
/// default = true
#[prost(bool, tag = "1")]
pub enable_page_index: bool,
/// default = false
#[prost(bool, tag = "34")]
pub eager_load_page_index: bool,
/// default = true
#[prost(bool, tag = "2")]
pub pruning: bool,
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/to_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
fn try_from(value: &ParquetOptions) -> datafusion_common::Result<Self, Self::Error> {
Ok(protobuf::ParquetOptions {
enable_page_index: value.enable_page_index,
eager_load_page_index: value.eager_load_page_index,
pruning: value.pruning,
skip_metadata: value.skip_metadata,
metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)),
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/datafusion_proto_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,9 @@ pub struct ParquetOptions {
/// default = true
#[prost(bool, tag = "1")]
pub enable_page_index: bool,
/// default = false
#[prost(bool, tag = "34")]
pub eager_load_page_index: bool,
/// default = true
#[prost(bool, tag = "2")]
pub pruning: bool,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto/src/logical_plan/file_formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ mod parquet {
TableParquetOptionsProto {
global: Some(ParquetOptionsProto {
enable_page_index: global_options.global.enable_page_index,
eager_load_page_index: global_options.global.eager_load_page_index,
pruning: global_options.global.pruning,
skip_metadata: global_options.global.skip_metadata,
metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| {
Expand Down Expand Up @@ -464,6 +465,7 @@ mod parquet {
#[allow(deprecated)] // max_statistics_size
ParquetOptions {
enable_page_index: proto.enable_page_index,
eager_load_page_index: proto.eager_load_page_index,
pruning: proto.pruning,
skip_metadata: proto.skip_metadata,
metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ datafusion.execution.parquet.data_page_row_count_limit 20000
datafusion.execution.parquet.data_pagesize_limit 1048576
datafusion.execution.parquet.dictionary_enabled true
datafusion.execution.parquet.dictionary_page_size_limit 1048576
datafusion.execution.parquet.eager_load_page_index false
datafusion.execution.parquet.enable_page_index true
datafusion.execution.parquet.encoding NULL
datafusion.execution.parquet.max_predicate_cache_size NULL
Expand Down Expand Up @@ -359,6 +360,7 @@ datafusion.execution.parquet.data_page_row_count_limit 20000 (writing) Sets best
datafusion.execution.parquet.data_pagesize_limit 1048576 (writing) Sets best effort maximum size of data page in bytes
datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting
datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes
datafusion.execution.parquet.eager_load_page_index false (reading) If true, eagerly loads the Parquet data page level metadata (the Page Index), this is mostly used by object stores where multiple loading I/O calls are expensive.
datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded.
datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting
datafusion.execution.parquet.max_predicate_cache_size NULL (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching.
Expand Down
Loading
Loading