Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,10 @@ config_namespace! {
/// bytes of the parquet file optimistically. If not specified, two reads are required:
/// One read to fetch the 8-byte parquet footer and
/// another to fetch the metadata length encoded in the footer
pub metadata_size_hint: Option<usize>, default = None
/// Default setting to 512 KiB, which should be sufficient for most parquet files,
/// it can reduce one I/O operation per parquet file. If the metadata is larger than
/// the hint, two reads will still be performed.
pub metadata_size_hint: Option<usize>, default = Some(512 * 1024)

/// (reading) If true, filter expressions are be applied during the parquet decoding operation to
/// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
Expand Down
14 changes: 14 additions & 0 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ pub struct ParquetReadOptions<'a> {
pub file_sort_order: Vec<Vec<SortExpr>>,
/// Properties for decryption of Parquet files that use modular encryption
pub file_decryption_properties: Option<ConfigFileDecryptionProperties>,
/// Metadata size hint for Parquet files reading (in bytes)
pub metadata_size_hint: Option<usize>,
}

impl Default for ParquetReadOptions<'_> {
Expand All @@ -281,6 +283,7 @@ impl Default for ParquetReadOptions<'_> {
schema: None,
file_sort_order: vec![],
file_decryption_properties: None,
metadata_size_hint: None,
}
}
}
Expand Down Expand Up @@ -340,6 +343,12 @@ impl<'a> ParquetReadOptions<'a> {
self.file_decryption_properties = Some(file_decryption_properties);
self
}

/// Configure metadata size hint for Parquet files reading (in bytes)
pub fn metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
self.metadata_size_hint = size_hint;
self
}
}

/// Options that control the reading of ARROW files.
Expand Down Expand Up @@ -606,6 +615,11 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
if let Some(file_decryption_properties) = &self.file_decryption_properties {
options.crypto.file_decryption = Some(file_decryption_properties.clone());
}
// This can be overridden per-read in ParquetReadOptions, if setting.
if let Some(metadata_size_hint) = self.metadata_size_hint {
options.global.metadata_size_hint = Some(metadata_size_hint);
}

let mut file_format = ParquetFormat::new().with_options(options);

if let Some(parquet_pruning) = self.parquet_pruning {
Expand Down
7 changes: 5 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,8 @@ mod tests {
let (files, _file_names) = store_parquet(vec![batch1], false).await?;

let state = SessionContext::new().state();
let format = ParquetFormat::default();
// Make metadata size hint None to keep original behavior
let format = ParquetFormat::default().with_metadata_size_hint(None);
let _schema = format.infer_schema(&state, &store.upcast(), &files).await?;
assert_eq!(store.request_count(), 3);
// No increase, cache being used.
Expand Down Expand Up @@ -620,7 +621,9 @@ mod tests {

let mut state = SessionContext::new().state();
state = set_view_state(state, force_views);
let format = ParquetFormat::default().with_force_view_types(force_views);
let format = ParquetFormat::default()
.with_force_view_types(force_views)
.with_metadata_size_hint(None);
let schema = format.infer_schema(&state, &store.upcast(), &files).await?;
assert_eq!(store.request_count(), 6);

Expand Down
59 changes: 49 additions & 10 deletions datafusion/core/tests/datasource/object_store_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
use arrow::array::{ArrayRef, Int32Array, RecordBatch};
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::prelude::{CsvReadOptions, SessionContext};
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext};
use futures::stream::BoxStream;
use insta::assert_snapshot;
use object_store::memory::InMemory;
Expand Down Expand Up @@ -121,8 +121,15 @@ async fn query_multi_csv_file() {

#[tokio::test]
async fn create_single_parquet_file() {
// Note that without a metadata size hint, the parquet reader
// must read the file footer to determine row group locations.
// And the registration process also does a HEAD request to
// determine the file size.
// Here we are setting to 1 to mock the None case, because setting to None,
// the register_parquet function will set it to default None, which will not
// override the default global option 512*1024.
assert_snapshot!(
single_file_parquet_test().await.requests(),
single_file_parquet_test(Some(1)).await.requests(),
@r"
RequestCountingObjectStore()
Total Requests: 4
Expand All @@ -134,10 +141,26 @@ async fn create_single_parquet_file() {
);
}

#[tokio::test]
async fn create_single_parquet_file_with_metadata_size_hint() {
// The register parquet process can be optimized with a metadata size hint,
// and the request number reduced from 4 to 2.
// This is the default behavior for datafusion now.
assert_snapshot!(
single_file_parquet_test(Some(512 * 1024)).await.requests(),
@r"
RequestCountingObjectStore()
Total Requests: 2
- HEAD path=parquet_table.parquet
- GET (range) range=0-2994 path=parquet_table.parquet
"
);
}

#[tokio::test]
async fn query_single_parquet_file() {
assert_snapshot!(
single_file_parquet_test().await.query("select count(distinct a), count(b) from parquet_table").await,
single_file_parquet_test(None).await.query("select count(distinct a), count(b) from parquet_table").await,
@r"
------- Query Output (1 rows) -------
+---------------------------------+------------------------+
Expand All @@ -160,7 +183,7 @@ async fn query_single_parquet_file_with_single_predicate() {
// Note that evaluating predicates requires additional object store requests
// (to evaluate predicates)
assert_snapshot!(
single_file_parquet_test().await.query("select min(a), max(b) from parquet_table WHERE a > 150").await,
single_file_parquet_test(None).await.query("select min(a), max(b) from parquet_table WHERE a > 150").await,
@r"
------- Query Output (1 rows) -------
+----------------------+----------------------+
Expand All @@ -182,7 +205,7 @@ async fn query_single_parquet_file_multi_row_groups_multiple_predicates() {
// Note that evaluating predicates requires additional object store requests
// (to evaluate predicates)
assert_snapshot!(
single_file_parquet_test().await.query("select min(a), max(b) from parquet_table WHERE a > 50 AND b < 1150").await,
single_file_parquet_test(None).await.query("select min(a), max(b) from parquet_table WHERE a > 50 AND b < 1150").await,
@r"
------- Query Output (1 rows) -------
+----------------------+----------------------+
Expand Down Expand Up @@ -242,7 +265,7 @@ async fn multi_file_csv_test() -> Test {
///
/// Column "b": Int32 with values 1000-1100] in row group 1
/// and [1101-1200] in row group 2
async fn single_file_parquet_test() -> Test {
async fn single_file_parquet_test(metadata_size_hint: Option<usize>) -> Test {
// Create parquet bytes
let a: ArrayRef = Arc::new(Int32Array::from_iter_values(0..200));
let b: ArrayRef = Arc::new(Int32Array::from_iter_values(1000..1200));
Expand All @@ -261,7 +284,11 @@ async fn single_file_parquet_test() -> Test {
Test::new()
.with_bytes("/parquet_table.parquet", buffer)
.await
.register_parquet("parquet_table", "/parquet_table.parquet")
.register_parquet(
"parquet_table",
"/parquet_table.parquet",
metadata_size_hint,
)
.await
}

Expand Down Expand Up @@ -312,11 +339,23 @@ impl Test {
self
}

/// Register a CSV file at the given path relative to the [`datafusion_test_data`] directory
async fn register_parquet(self, table_name: &str, path: &str) -> Self {
/// Register a Parquet file at the given path relative to the [`datafusion_test_data`] directory
/// Adding metadata_size_hint here for easy testing of parquet metadata read optimizations
async fn register_parquet(
self,
table_name: &str,
path: &str,
metadata_size_hint: Option<usize>,
) -> Self {
let path = format!("mem://{path}");
let options: ParquetReadOptions<'_> = Default::default();

self.session_context
.register_parquet(table_name, path, Default::default())
.register_parquet(
table_name,
path,
options.metadata_size_hint(metadata_size_hint),
)
.await
.unwrap();
self
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ datafusion.execution.parquet.max_predicate_cache_size NULL
datafusion.execution.parquet.max_row_group_size 1048576
datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2
datafusion.execution.parquet.maximum_parallel_row_group_writers 1
datafusion.execution.parquet.metadata_size_hint NULL
datafusion.execution.parquet.metadata_size_hint 524288
datafusion.execution.parquet.pruning true
datafusion.execution.parquet.pushdown_filters false
datafusion.execution.parquet.reorder_filters false
Expand Down Expand Up @@ -366,7 +366,7 @@ datafusion.execution.parquet.max_predicate_cache_size NULL (reading) The maximum
datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read.
datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame.
datafusion.execution.parquet.maximum_parallel_row_group_writers 1 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame.
datafusion.execution.parquet.metadata_size_hint NULL (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer
datafusion.execution.parquet.metadata_size_hint 524288 (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed.
datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file
datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization".
datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ The following configuration settings are available:
| datafusion.execution.parquet.enable_page_index | true | (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. |
| datafusion.execution.parquet.pruning | true | (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file |
| datafusion.execution.parquet.skip_metadata | true | (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata |
| datafusion.execution.parquet.metadata_size_hint | NULL | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer |
| datafusion.execution.parquet.metadata_size_hint | 524288 | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. |
| datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". |
| datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query |
| datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. |
Expand Down