Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
365 changes: 365 additions & 0 deletions datafusion/datasource-parquet/src/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
// under the License.

use crate::sort::reverse_row_selection;
use arrow::datatypes::Schema;
use datafusion_common::{Result, assert_eq_or_internal_err};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use log::debug;
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};

Expand Down Expand Up @@ -377,6 +382,120 @@ impl PreparedAccessPlan {
})
}

/// Reorder row groups by their min statistics for the given sort order.
///
/// This helps TopK queries find optimal values first. For ASC sort,
/// row groups with the smallest min values come first. For DESC sort,
/// row groups with the largest min values come first.
///
/// Gracefully skips reordering when:
/// - There is a row_selection (too complex to remap)
/// - 0 or 1 row groups (nothing to reorder)
/// - Sort expression is not a simple column reference
/// - Statistics are unavailable
pub(crate) fn reorder_by_statistics(
Copy link
Copy Markdown
Contributor

@Dandandan Dandandan Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @adriangb had the great idea to also order by grouping keys which can

  • reduce cardinality within partitions (partition-local state can be smaller)
  • allow for better cache locality (row groups with more equal keys are grouped together)

Doesn't have to be in this PR but perhaps we can think about how it fits in.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Dandandan for review! That's a great extension. The reorder_by_statistics method is generic enough to take any LexOrdering — it doesn't need to be tied to TopK specifically. So extending this for GROUP BY should be a matter of:

  1. Computing a preferred RG ordering from grouping keys in the aggregate planner
  2. Passing it through to ParquetSource::sort_order_for_reorder

Happy to track this as a follow-up issue. Will open one after this PR lands.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Dandandan! Created #21581 to track this. The existing infrastructure from this PR should be directly reusable — mainly needs the aggregate planner to populate sort_order_for_reorder from grouping keys.

mut self,
sort_order: &LexOrdering,
file_metadata: &ParquetMetaData,
arrow_schema: &Schema,
) -> Result<Self> {
// Skip if row_selection present (too complex to remap)
if self.row_selection.is_some() {
debug!("Skipping RG reorder: row_selection present");
return Ok(self);
}

// Nothing to reorder
if self.row_group_indexes.len() <= 1 {
return Ok(self);
}

// Get the first sort expression
// LexOrdering is guaranteed non-empty, so first() returns &PhysicalSortExpr
let first_sort_expr = sort_order.first();
Comment on lines +414 to +415
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort_order.first() (if LexOrdering is Vec-like) returns Option<&PhysicalSortExpr>, but the code uses it as if it were &PhysicalSortExpr (first_sort_expr.expr...). This is likely a compile error. A concrete fix is to obtain the first element via iteration and handle the empty case (e.g., early-return Ok(self) if no sort expressions), then use the returned &PhysicalSortExpr.

Suggested change
// LexOrdering is guaranteed non-empty, so first() returns &PhysicalSortExpr
let first_sort_expr = sort_order.first();
let first_sort_expr = match sort_order.iter().next() {
Some(expr) => expr,
None => {
debug!("Skipping RG reorder: empty sort order");
return Ok(self);
}
};

Copilot uses AI. Check for mistakes.

// Extract column name from sort expression
let column: &Column = match first_sort_expr.expr.as_any().downcast_ref::<Column>()
{
Some(col) => col,
None => {
debug!("Skipping RG reorder: sort expr is not a simple column");
return Ok(self);
}
};

let descending = first_sort_expr.options.descending;
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For DESC ordering, reordering by min values is often a poor proxy for “row group likely contains the largest values first”; typically you want to sort by max when descending == true (and by min when ascending). This can significantly reduce the intended TopK benefit (and can even choose a worse first row group when ranges overlap). Consider switching to row_group_maxs(...) for descending order, and update the doc comment (currently mentions “min/max”) and the DESC unit test accordingly.

Copilot uses AI. Check for mistakes.

// Build statistics converter for this column
let converter = match StatisticsConverter::try_new(
column.name(),
arrow_schema,
file_metadata.file_metadata().schema_descr(),
) {
Ok(c) => c,
Err(e) => {
debug!("Skipping RG reorder: cannot create stats converter: {e}");
return Ok(self);
}
};

// Get the relevant statistics for the selected row groups.
// For ASC sort: use min values — we want the RG with the smallest min
// to come first (best candidate for "smallest values").
// For DESC sort: use max values — we want the RG with the largest max
// to come first (best candidate for "largest values"). Using min for
// DESC can pick a worse first RG when ranges overlap (e.g., RG0 50-60
// vs RG1 40-100 — RG1 has larger values but smaller min).
let rg_metadata: Vec<&RowGroupMetaData> = self
.row_group_indexes
.iter()
.map(|&idx| file_metadata.row_group(idx))
.collect();

let stat_values = if descending {
match converter.row_group_maxes(rg_metadata.iter().copied()) {
Ok(vals) => vals,
Err(e) => {
debug!("Skipping RG reorder: cannot get max values: {e}");
return Ok(self);
}
}
} else {
match converter.row_group_mins(rg_metadata.iter().copied()) {
Ok(vals) => vals,
Err(e) => {
debug!("Skipping RG reorder: cannot get min values: {e}");
return Ok(self);
}
}
};

// Sort indices by statistic values (min for ASC, max for DESC)
let sort_options = arrow::compute::SortOptions {
descending,
nulls_first: first_sort_expr.options.nulls_first,
};
let sorted_indices =
match arrow::compute::sort_to_indices(&stat_values, Some(sort_options), None)
{
Ok(indices) => indices,
Err(e) => {
debug!("Skipping RG reorder: sort failed: {e}");
return Ok(self);
}
};

// Apply the reordering
let original_indexes = self.row_group_indexes.clone();
self.row_group_indexes = sorted_indices
.values()
.iter()
.map(|&i| original_indexes[i as usize])
.collect();

Ok(self)
}

/// Reverse the access plan for reverse scanning
pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result<Self> {
// Get the row group indexes before reversing
Expand Down Expand Up @@ -614,4 +733,250 @@ mod test {
.unwrap();
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}

// ---- reorder_by_statistics tests ----

use arrow::datatypes::{DataType, Field, Schema};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use parquet::basic::Type as PhysicalType;
use parquet::file::metadata::FileMetaData;
use parquet::file::statistics::Statistics;
use parquet::schema::types::Type as SchemaType;

/// Create ParquetMetaData with row groups that have Int32 min/max stats
fn make_metadata_with_stats(min_max_pairs: &[(i32, i32)]) -> ParquetMetaData {
let field = SchemaType::primitive_type_builder("id", PhysicalType::INT32)
.build()
.unwrap();
let schema = SchemaType::group_type_builder("schema")
.with_fields(vec![Arc::new(field)])
.build()
.unwrap();
let schema_descr = Arc::new(SchemaDescriptor::new(Arc::new(schema)));

let row_groups: Vec<RowGroupMetaData> = min_max_pairs
.iter()
.map(|(min, max)| {
let stats =
Statistics::int32(Some(*min), Some(*max), None, Some(100), false);
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(100)
.set_statistics(stats)
.build()
.unwrap();
RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(100)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect();

let file_meta = FileMetaData::new(
1,
min_max_pairs.len() as i64 * 100,
None,
None,
schema_descr,
None,
);
ParquetMetaData::new(file_meta, row_groups)
}

fn make_sort_order_asc() -> LexOrdering {
LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(Column::new(
"id", 0,
)))])
.unwrap()
}

fn make_sort_order_desc() -> LexOrdering {
use arrow::compute::SortOptions;
LexOrdering::new(vec![PhysicalSortExpr::new(
Arc::new(Column::new("id", 0)),
SortOptions {
descending: true,
nulls_first: false,
},
)])
.unwrap()
}

fn make_arrow_schema() -> Schema {
Schema::new(vec![Field::new("id", DataType::Int32, false)])
}

#[test]
fn test_reorder_by_statistics_asc() {
// RGs in wrong order: [50-99, 200-299, 1-30]
let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_asc();

let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Should be reordered: RG2(1-30), RG0(50-99), RG1(200-299)
assert_eq!(plan.row_group_indexes, vec![2, 0, 1]);
}

#[test]
fn test_reorder_by_statistics_desc() {
// RGs: [50-99, 200-299, 1-30]
let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_desc();

let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// DESC: largest max first: RG1(max=299), RG0(max=99), RG2(max=30)
assert_eq!(plan.row_group_indexes, vec![1, 0, 2]);
}

#[test]
fn test_reorder_by_statistics_single_rg() {
let metadata = make_metadata_with_stats(&[(1, 100)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_asc();

let plan = PreparedAccessPlan::new(vec![0], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Single RG, no reorder
assert_eq!(plan.row_group_indexes, vec![0]);
}

#[test]
fn test_reorder_by_statistics_with_skipped_rgs() {
// 4 RGs but only 0, 2, 3 are selected (RG1 was pruned)
let metadata =
make_metadata_with_stats(&[(300, 400), (100, 200), (1, 50), (50, 99)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_asc();

let plan = PreparedAccessPlan::new(vec![0, 2, 3], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Reorder selected RGs by min: RG2(1-50), RG3(50-99), RG0(300-400)
assert_eq!(plan.row_group_indexes, vec![2, 3, 0]);
}

#[test]
fn test_reorder_by_statistics_skips_with_row_selection() {
let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_asc();

let selection = RowSelection::from(vec![
RowSelector::select(50),
RowSelector::skip(50),
RowSelector::select(100),
]);

let plan = PreparedAccessPlan::new(vec![0, 1], Some(selection)).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Should NOT reorder because row_selection is present
assert_eq!(plan.row_group_indexes, vec![0, 1]);
}

#[test]
fn test_reorder_by_statistics_already_sorted() {
// Already in correct ASC order
let metadata = make_metadata_with_stats(&[(1, 30), (50, 99), (200, 299)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_asc();

let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Already sorted, order preserved
assert_eq!(plan.row_group_indexes, vec![0, 1, 2]);
}

#[test]
fn test_reorder_by_statistics_skips_non_column_expr() {
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::BinaryExpr;

let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]);
let schema = make_arrow_schema();

// Sort expression is a binary expression (id + 1), not a simple column
let expr = Arc::new(BinaryExpr::new(
Arc::new(Column::new("id", 0)),
Operator::Plus,
Arc::new(datafusion_physical_expr::expressions::Literal::new(
datafusion_common::ScalarValue::Int32(Some(1)),
)),
));
let sort_order =
LexOrdering::new(vec![PhysicalSortExpr::new_default(expr)]).unwrap();

let plan = PreparedAccessPlan::new(vec![0, 1], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Should NOT reorder because sort expr is not a simple column
assert_eq!(plan.row_group_indexes, vec![0, 1]);
}

#[test]
fn test_reorder_by_statistics_desc_uses_max_for_overlapping_rgs() {
// Overlapping ranges where min DESC would pick worse RG than max DESC:
// RG0: 50-60 (small range, moderate max)
// RG1: 40-100 (wide range, high max but lower min)
// RG2: 20-30 (low max)
//
// For ORDER BY DESC LIMIT N:
// Using min DESC: [RG0(50), RG1(40), RG2(20)] → reads RG0 first (max=60 only)
// Using max DESC: [RG1(100), RG0(60), RG2(30)] → reads RG1 first (max=100)
//
// RG1 is the better first choice for DESC because it contains the largest values.
let metadata = make_metadata_with_stats(&[(50, 60), (40, 100), (20, 30)]);
let schema = make_arrow_schema();
let sort_order = make_sort_order_desc();

let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Expected: RG1 (max=100) first, then RG0 (max=60), then RG2 (max=30)
assert_eq!(plan.row_group_indexes, vec![1, 0, 2]);
}

#[test]
fn test_reorder_by_statistics_skips_missing_column() {
let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]);
// Schema has "id" but sort order references "nonexistent"
let schema = make_arrow_schema();
let sort_order = LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(
Column::new("nonexistent", 99),
))])
.unwrap();

let plan = PreparedAccessPlan::new(vec![0, 1], None).unwrap();
let plan = plan
.reorder_by_statistics(&sort_order, &metadata, &schema)
.unwrap();

// Should NOT reorder because column not found in schema
assert_eq!(plan.row_group_indexes, vec![0, 1]);
}
}
Loading
Loading